Spaces:
Running
Running
| from typing import List, Optional, Callable, Any | |
| import logging | |
| import re | |
| from thefuzz import fuzz | |
| from langchain.output_parsers.openai_tools import JsonOutputToolsParser | |
| from langchain_core.runnables import RunnableSequence | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.language_models.llms import LLM | |
| from langchain_core.messages import AIMessage | |
| from langgraph.constants import END | |
| from pydantic import BaseModel, Field | |
| from ask_candid.agents.schema import AgentState | |
| from ask_candid.services.org_search import OrgSearch | |
| search = OrgSearch() | |
| logging.basicConfig(format="[%(levelname)s] (%(asctime)s) :: %(message)s") | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| class OrganizationNames(BaseModel): | |
| """List of names of social-sector organizations, such as nonprofits and foundations.""" | |
| orgnames: List[str] = Field(description="List of organization names") | |
| def extract_org_links_from_chatbot(chatbot_output: str, llm: LLM): | |
| """ | |
| Extracts a list of organization names from the provided text. | |
| Args: | |
| chatbot_output (str):The chatbot output containing organization names and other content. | |
| Returns: | |
| list: A list of organization names extracted from the text. | |
| Raises: | |
| ValueError: If parsing fails or if an unexpected output format is received. | |
| """ | |
| prompt = """Extract only the names of officially recognized organizations, foundations, and government entities | |
| from the text below. Do not include any entries that contain descriptions, regional identifiers, or explanations | |
| within parentheses or following the name. Strictly exclude databases, resources, crowdfunding platforms, and general | |
| terms. Provide the output only in the specified JSON format. | |
| input text below: | |
| ```{chatbot_output}`` | |
| output format: | |
| {{ | |
| 'orgnames' : [list of organization names without any additional descriptions or identifiers] | |
| }} | |
| """ | |
| try: | |
| parser = JsonOutputToolsParser() | |
| model = llm.bind_tools([OrganizationNames]) | |
| prompt = ChatPromptTemplate.from_template(prompt) | |
| chain = RunnableSequence(prompt, model, parser) | |
| # Run the chain with the input data | |
| result = chain.invoke({"chatbot_output": chatbot_output}) | |
| # Extract the organization names from the output | |
| output_list = result[0]["args"].get("orgnames", []) | |
| # Validate output format | |
| if not isinstance(output_list, list): | |
| raise ValueError("Unexpected output format: 'orgnames' should be a list") | |
| return output_list | |
| except Exception as e: | |
| # Log or print the error as needed for debugging | |
| print(f"text does not have any organization: {e}") | |
| return [] | |
| def is_similar(name: str, list_of_dict: list, threshold: int = 80): | |
| """ | |
| Returns True if `name` is similar to any names in `list_of_dict` based on a similarity threshold. | |
| """ | |
| try: | |
| for item in list_of_dict: | |
| try: | |
| # Attempt to calculate similarity score | |
| similarity = fuzz.ratio(name.lower(), item["name"].lower()) | |
| if similarity >= threshold: | |
| return True | |
| except KeyError: | |
| # Handle cases where 'name' key might be missing in dictionary | |
| print(f"KeyError: Missing 'name' key in dictionary item {item}") | |
| continue | |
| except AttributeError: | |
| # Handle non-string name values in dictionary items | |
| print(f"AttributeError: Non-string 'name' in dictionary item {item}") | |
| continue | |
| except TypeError as e: | |
| # Handle cases where input types are incorrect | |
| print(f"TypeError: {e}") | |
| return False | |
| return False | |
| def generate_org_link_dict(org_names_list: list): | |
| """ | |
| Maps organization names to their Candid profile URLs if available. | |
| For each organization in `output_list`, this function attempts to retrieve a matching profile | |
| using `search_org`. If a similar name is found and a Candid entity ID is available, it constructs | |
| a profile URL. If no ID or similar match is found, or if an error occurs, it assigns an empty string. | |
| Args: | |
| output_list (list): List of organization names (str) to retrieve Candid profile links for. | |
| Returns: | |
| dict: Dictionary with organization names as keys and Candid profile URLs or empty strings as values. | |
| Example: | |
| get_org_link(['New York-Presbyterian Hospital']) | |
| # {'New York-Presbyterian Hospital': 'https://app.candid.org/profile/6915255'} | |
| """ | |
| link_dict = {} | |
| for org in org_names_list: | |
| try: | |
| # Attempt to retrieve organization data | |
| response = search(org, name_only=True) | |
| # Check if there is a valid response and if names are similar | |
| if response and is_similar(org, response[0].get("names", "")): | |
| # Try to get the Candid entity ID and construct the URL | |
| candid_entity_id = response[0].get("candid_entity_id") | |
| if candid_entity_id: | |
| link_dict[org] = ( | |
| f"https://app.candid.org/profile/{candid_entity_id}" | |
| ) | |
| else: | |
| link_dict[org] = "" # No ID found, set empty string | |
| else: | |
| link_dict[org] = "" # No similar match found | |
| except KeyError as e: | |
| # Handle missing keys in the response dictionary | |
| print(f"KeyError encountered for organization '{org}': {e}") | |
| link_dict[org] = "" | |
| except Exception as e: | |
| # Catch any other unexpected errors | |
| print(f"An error occurred for organization '{org}': {e}") | |
| link_dict[org] = "" | |
| return link_dict | |
| def embed_org_links_in_text(input_text: str, org_link_dict: dict): | |
| """ | |
| Replaces organization names in `text` with links from `link_dict` and appends a Candid info message. | |
| Args: | |
| text (str): The text containing organization names. | |
| link_dict (dict): Mapping of organization names to URLs. | |
| Returns: | |
| str: Updated text with linked organization names and an appended Candid message. | |
| """ | |
| try: | |
| for org_name, url in org_link_dict.items(): | |
| if url: # Only proceed if the URL is not empty | |
| regex_pattern = re.compile(re.escape(org_name)) | |
| input_text = regex_pattern.sub( | |
| repl=f"<a href={url} target='_blank' rel='noreferrer' class='candid-org-link'>{org_name}</a>", | |
| string=input_text | |
| ) | |
| # Append Candid information message at the end | |
| input_text += ( | |
| "<p class='candid-app-link'> " | |
| "Visit <a href=https://app.candid.org/ target='_blank' rel='noreferrer' class='candid-org-link'>Candid</a> " | |
| "to get nonprofit information you need.</p>" | |
| ) | |
| except TypeError as e: | |
| print(f"TypeError encountered: {e}") | |
| return input_text | |
| except re.error as e: | |
| print(f"Regex error encountered for '{org_name}': {e}") | |
| return input_text | |
| except Exception as e: | |
| print(f"Unexpected error: {e}") | |
| return input_text | |
| return input_text | |
| def has_org_name( | |
| state: AgentState, | |
| llm: LLM, | |
| user_callback: Optional[Callable[[str], Any]] = None | |
| ) -> AgentState: | |
| """Processes the latest message to extract organization links and determine the next step. | |
| Parameters | |
| ---------- | |
| state : AgentState | |
| The current state of the agent, including a list of messages. | |
| llm : LLM | |
| user_callback : Optional[Callable[[str], Any]], optional | |
| Optional UI callback to inform the user of apps states, by default None | |
| Returns | |
| ------- | |
| AgentState | |
| """ | |
| logger.info("---HAS ORG NAMES?---") | |
| if user_callback is not None: | |
| try: | |
| user_callback("Checking for relevant organizations") | |
| except Exception as ex: | |
| logger.warning("User callback was passed in but failed: %s", ex) | |
| messages = state["messages"] | |
| last_message = messages[-1].content | |
| output_list = extract_org_links_from_chatbot(last_message, llm=llm) | |
| link_dict = generate_org_link_dict(output_list) if output_list else {} | |
| if link_dict: | |
| logger.info("---FOUND ORG NAMES---") | |
| return {"next": "insert_org_link", "org_dict": link_dict} | |
| logger.info("---NO ORG NAMES FOUND---") | |
| return {"next": END, "messages": messages} | |
| def insert_org_link(state: AgentState) -> AgentState: | |
| """ | |
| Embeds organization links in the latest message content and returns it as an AI message. | |
| Args: | |
| state (dict): The current state, including the organization links and latest message. | |
| Returns: | |
| dict: A dictionary with the updated message content as an AIMessage. | |
| """ | |
| logger.info("---INSERT ORG LINKS---") | |
| messages = state["messages"] | |
| last_message = messages[-1].content | |
| messages.pop(-1) # Deleting the original message because we will append the same one but with links | |
| link_dict = state["org_dict"] | |
| last_message = embed_org_links_in_text(last_message, link_dict) | |
| return {"messages": [AIMessage(content=last_message)]} | |