Spaces:
Runtime error
Runtime error
| """Question answering over a graph.""" | |
| from __future__ import annotations | |
| import re | |
| from typing import Any, Dict, List, Optional | |
| from langchain_core.language_models import BaseLanguageModel | |
| from langchain_core.prompts import BasePromptTemplate | |
| from langchain_core.pydantic_v1 import Field | |
| from langchain.callbacks.manager import CallbackManagerForChainRun | |
| from langchain.chains.base import Chain | |
| from langchain.chains.graph_qa.cypher_utils import CypherQueryCorrector, Schema | |
| from langchain.chains.graph_qa.prompts import CYPHER_GENERATION_PROMPT, CYPHER_QA_PROMPT | |
| from langchain.chains.llm import LLMChain | |
| from langchain.graphs.graph_store import GraphStore | |
| INTERMEDIATE_STEPS_KEY = "intermediate_steps" | |
| def extract_cypher(text: str) -> str: | |
| """Extract Cypher code from a text. | |
| Args: | |
| text: Text to extract Cypher code from. | |
| Returns: | |
| Cypher code extracted from the text. | |
| """ | |
| # The pattern to find Cypher code enclosed in triple backticks | |
| pattern = r"```(.*?)```" | |
| # Find all matches in the input text | |
| matches = re.findall(pattern, text, re.DOTALL) | |
| return matches[0] if matches else text | |
| def construct_schema( | |
| structured_schema: Dict[str, Any], | |
| include_types: List[str], | |
| exclude_types: List[str], | |
| ) -> str: | |
| """Filter the schema based on included or excluded types""" | |
| def filter_func(x: str) -> bool: | |
| return x in include_types if include_types else x not in exclude_types | |
| filtered_schema = { | |
| "node_props": { | |
| k: v | |
| for k, v in structured_schema.get("node_props", {}).items() | |
| if filter_func(k) | |
| }, | |
| "rel_props": { | |
| k: v | |
| for k, v in structured_schema.get("rel_props", {}).items() | |
| if filter_func(k) | |
| }, | |
| "relationships": [ | |
| r | |
| for r in structured_schema.get("relationships", []) | |
| if all(filter_func(r[t]) for t in ["start", "end", "type"]) | |
| ], | |
| } | |
| return ( | |
| f"Node properties are the following: \n {filtered_schema['node_props']}\n" | |
| f"Relationships properties are the following: \n {filtered_schema['rel_props']}" | |
| "\nRelationships are: \n" | |
| + str( | |
| [ | |
| f"(:{el['start']})-[:{el['type']}]->(:{el['end']})" | |
| for el in filtered_schema["relationships"] | |
| ] | |
| ) | |
| ) | |
| class GraphCypherQAChain(Chain): | |
| """Chain for question-answering against a graph by generating Cypher statements. | |
| *Security note*: Make sure that the database connection uses credentials | |
| that are narrowly-scoped to only include necessary permissions. | |
| Failure to do so may result in data corruption or loss, since the calling | |
| code may attempt commands that would result in deletion, mutation | |
| of data if appropriately prompted or reading sensitive data if such | |
| data is present in the database. | |
| The best way to guard against such negative outcomes is to (as appropriate) | |
| limit the permissions granted to the credentials used with this tool. | |
| See https://python.langchain.com/docs/security for more information. | |
| """ | |
| graph: GraphStore = Field(exclude=True) | |
| cypher_generation_chain: LLMChain | |
| qa_chain: LLMChain | |
| graph_schema: str | |
| input_key: str = "query" #: :meta private: | |
| output_key: str = "result" #: :meta private: | |
| top_k: int = 10 | |
| """Number of results to return from the query""" | |
| return_intermediate_steps: bool = False | |
| """Whether or not to return the intermediate steps along with the final answer.""" | |
| return_direct: bool = False | |
| """Whether or not to return the result of querying the graph directly.""" | |
| cypher_query_corrector: Optional[CypherQueryCorrector] = None | |
| """Optional cypher validation tool""" | |
| def input_keys(self) -> List[str]: | |
| """Return the input keys. | |
| :meta private: | |
| """ | |
| return [self.input_key] | |
| def output_keys(self) -> List[str]: | |
| """Return the output keys. | |
| :meta private: | |
| """ | |
| _output_keys = [self.output_key] | |
| return _output_keys | |
| def _chain_type(self) -> str: | |
| return "graph_cypher_chain" | |
| def from_llm( | |
| cls, | |
| llm: Optional[BaseLanguageModel] = None, | |
| *, | |
| qa_prompt: Optional[BasePromptTemplate] = None, | |
| cypher_prompt: Optional[BasePromptTemplate] = None, | |
| cypher_llm: Optional[BaseLanguageModel] = None, | |
| qa_llm: Optional[BaseLanguageModel] = None, | |
| exclude_types: List[str] = [], | |
| include_types: List[str] = [], | |
| validate_cypher: bool = False, | |
| qa_llm_kwargs: Optional[Dict[str, Any]] = None, | |
| cypher_llm_kwargs: Optional[Dict[str, Any]] = None, | |
| **kwargs: Any, | |
| ) -> GraphCypherQAChain: | |
| """Initialize from LLM.""" | |
| if not cypher_llm and not llm: | |
| raise ValueError("Either `llm` or `cypher_llm` parameters must be provided") | |
| if not qa_llm and not llm: | |
| raise ValueError("Either `llm` or `qa_llm` parameters must be provided") | |
| if cypher_llm and qa_llm and llm: | |
| raise ValueError( | |
| "You can specify up to two of 'cypher_llm', 'qa_llm'" | |
| ", and 'llm', but not all three simultaneously." | |
| ) | |
| if cypher_prompt and cypher_llm_kwargs: | |
| raise ValueError( | |
| "Specifying cypher_prompt and cypher_llm_kwargs together is" | |
| " not allowed. Please pass prompt via cypher_llm_kwargs." | |
| ) | |
| if qa_prompt and qa_llm_kwargs: | |
| raise ValueError( | |
| "Specifying qa_prompt and qa_llm_kwargs together is" | |
| " not allowed. Please pass prompt via qa_llm_kwargs." | |
| ) | |
| use_qa_llm_kwargs = qa_llm_kwargs if qa_llm_kwargs is not None else {} | |
| use_cypher_llm_kwargs = ( | |
| cypher_llm_kwargs if cypher_llm_kwargs is not None else {} | |
| ) | |
| if "prompt" not in use_qa_llm_kwargs: | |
| use_qa_llm_kwargs["prompt"] = ( | |
| qa_prompt if qa_prompt is not None else CYPHER_QA_PROMPT | |
| ) | |
| if "prompt" not in use_cypher_llm_kwargs: | |
| use_cypher_llm_kwargs["prompt"] = ( | |
| cypher_prompt if cypher_prompt is not None else CYPHER_GENERATION_PROMPT | |
| ) | |
| qa_chain = LLMChain(llm=qa_llm or llm, **use_qa_llm_kwargs) | |
| cypher_generation_chain = LLMChain( | |
| llm=cypher_llm or llm, **use_cypher_llm_kwargs | |
| ) | |
| if exclude_types and include_types: | |
| raise ValueError( | |
| "Either `exclude_types` or `include_types` " | |
| "can be provided, but not both" | |
| ) | |
| graph_schema = construct_schema( | |
| kwargs["graph"].get_structured_schema, include_types, exclude_types | |
| ) | |
| cypher_query_corrector = None | |
| if validate_cypher: | |
| corrector_schema = [ | |
| Schema(el["start"], el["type"], el["end"]) | |
| for el in kwargs["graph"].structured_schema.get("relationships") | |
| ] | |
| cypher_query_corrector = CypherQueryCorrector(corrector_schema) | |
| return cls( | |
| graph_schema=graph_schema, | |
| qa_chain=qa_chain, | |
| cypher_generation_chain=cypher_generation_chain, | |
| cypher_query_corrector=cypher_query_corrector, | |
| **kwargs, | |
| ) | |
| def _call( | |
| self, | |
| inputs: Dict[str, Any], | |
| run_manager: Optional[CallbackManagerForChainRun] = None, | |
| ) -> Dict[str, Any]: | |
| """Generate Cypher statement, use it to look up in db and answer question.""" | |
| _run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager() | |
| callbacks = _run_manager.get_child() | |
| question = inputs[self.input_key] | |
| intermediate_steps: List = [] | |
| generated_cypher = self.cypher_generation_chain.run( | |
| {"question": question, "schema": self.graph_schema}, callbacks=callbacks | |
| ) | |
| # Extract Cypher code if it is wrapped in backticks | |
| generated_cypher = extract_cypher(generated_cypher) | |
| # Correct Cypher query if enabled | |
| if self.cypher_query_corrector: | |
| generated_cypher = self.cypher_query_corrector(generated_cypher) | |
| _run_manager.on_text("Generated Cypher:", end="\n", verbose=self.verbose) | |
| _run_manager.on_text( | |
| generated_cypher, color="green", end="\n", verbose=self.verbose | |
| ) | |
| intermediate_steps.append({"query": generated_cypher}) | |
| # Retrieve and limit the number of results | |
| # Generated Cypher be null if query corrector identifies invalid schema | |
| if generated_cypher: | |
| context = self.graph.query(generated_cypher)[: self.top_k] | |
| else: | |
| context = [] | |
| if self.return_direct: | |
| final_result = context | |
| else: | |
| _run_manager.on_text("Full Context:", end="\n", verbose=self.verbose) | |
| _run_manager.on_text( | |
| str(context), color="green", end="\n", verbose=self.verbose | |
| ) | |
| intermediate_steps.append({"context": context}) | |
| result = self.qa_chain( | |
| {"question": question, "context": context}, | |
| callbacks=callbacks, | |
| ) | |
| final_result = result[self.qa_chain.output_key] | |
| chain_result: Dict[str, Any] = {self.output_key: final_result} | |
| if self.return_intermediate_steps: | |
| chain_result[INTERMEDIATE_STEPS_KEY] = intermediate_steps | |
| return chain_result | |