Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from langgraph.graph import StateGraph, START, END | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_groq import ChatGroq | |
| from langchain_openai import ChatOpenAI | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_mistralai.chat_models import ChatMistralAI | |
| from langchain_cohere import ChatCohere | |
| from typing_extensions import TypedDict | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| import networkx as nx | |
| from pyvis.network import Network | |
| def web_search(query: str) -> str: | |
| """ | |
| Search Tavily for a query to answer questions about recent events or general knowledge. | |
| Args: | |
| query: The search query. | |
| """ | |
| search_tool = TavilySearchResults(max_results=2) | |
| # The tool now returns a dictionary, and the actual results are in the 'results' key | |
| search_output = search_tool.invoke({"query": query}) | |
| formatted_output = "\n\n".join([doc["content"] for doc in search_output]) | |
| print(search_output) | |
| # We need to loop through the list of result dictionaries and extract the 'content' from each | |
| return formatted_output | |
| def wiki_search(query: str) -> str: | |
| """ | |
| Search Wikipedia for a query and return maximum 1 results. | |
| Args: | |
| query: The search query. | |
| """ | |
| search_docs = WikipediaLoader(query=query, load_max_docs=1).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| class AgentState(TypedDict): | |
| """ | |
| State of our agent. It's a dictionary that will | |
| hold all the information passed between the different nodes of our graph. | |
| """ | |
| original_input: str # What the user initially types | |
| text_to_map: str # The text we'll process (might be the same as original_input or come from Wikipedia) | |
| keywords: list # Our list of keywords with importance | |
| connections: list # Our list of (keyword, relation, keyword) triplets | |
| summary: str # The final summary of the graph | |
| image_path: str # The image path we generate | |
| download_path: str # The filename for download | |
| chosen_path: str # The chosen path by llm | |
| class KnowledgeMapperAgent: | |
| def __init__(self, selected_api, api_key, model): | |
| print(f"class KnowledgeMapperAgent: initialized with API: {selected_api} and Model: {model}") | |
| # This block now handles initializing the correct LLM based on user selection | |
| if selected_api == "Gemini": | |
| self.llm = ChatGoogleGenerativeAI(model=model, temperature=0, api_key=api_key) | |
| elif selected_api == "Groq": | |
| self.llm = ChatGroq(model=model, temperature=0, api_key=api_key) | |
| elif selected_api == "OpenAI": | |
| self.llm = ChatOpenAI(model=model, temperature=0, api_key=api_key) | |
| elif selected_api == "Anthropic": | |
| self.llm = ChatAnthropic(model=model, temperature=0, api_key=api_key) | |
| elif selected_api == "Mistral": | |
| self.llm = ChatMistralAI(model=model, temperature=0, api_key=api_key) | |
| elif selected_api == "Cohere": | |
| self.llm = ChatCohere(model=model, temperature=0, api_key=api_key) | |
| else: | |
| raise ValueError(f"Unsupported API provider selected: {selected_api}. Please check the list of supported providers.") | |
| # 1. Initialize our agent (start the graph w/ AgentState) | |
| workflow = StateGraph(AgentState) | |
| # 2. Add the tools | |
| workflow.add_node("decide_path_node", self._decide_path_node) | |
| workflow.add_node("fetch_from_wiki", self._fetch_from_wiki) | |
| workflow.add_node("perform_web_search", self._perform_web_search) | |
| workflow.add_node("extract_keywords", self.extract_keywords) | |
| workflow.add_node("connect_keywords", self.connect_keywords) | |
| workflow.add_node("summarize_map", self.summarize_map) | |
| workflow.add_node("generate_graph_image", self.generate_graph_image) | |
| # 3. Set the entry point to router | |
| workflow.set_entry_point("decide_path_node") | |
| # 4. Add the CONDITIONAL edge | |
| workflow.add_conditional_edges( | |
| "decide_path_node", # Start from router node | |
| self._route_next_step, # Choose the function we will call | |
| { | |
| # This dictionary maps the return value to the name of the next node | |
| "fetch_from_wiki": "fetch_from_wiki", | |
| "perform_web_search": "perform_web_search", | |
| "extract_keywords": "extract_keywords" | |
| } | |
| ) | |
| # 5. Add the regular edges for the rest of the flow | |
| workflow.add_edge("fetch_from_wiki", "extract_keywords") # After wiki, go to extract | |
| workflow.add_edge("perform_web_search", "extract_keywords") # After wiki, go to extract | |
| workflow.add_edge("extract_keywords", "connect_keywords") # After extracting, connect them | |
| workflow.add_edge("connect_keywords", "generate_graph_image") # We generate graph image | |
| workflow.add_edge("generate_graph_image", "summarize_map") # Finally, generate the summary | |
| workflow.add_edge("summarize_map", END) # and end the graph | |
| # 6. Compile the graph | |
| self.graph = workflow.compile() | |
| # Initial agent call | |
| def __call__(self, text: str): | |
| # 1. Define initial state | |
| initial_state = { | |
| "original_input": text, | |
| "text_to_map": "", | |
| "keywords": [], | |
| "connections": [], | |
| "summary":"", | |
| "chosen_path": "" | |
| } | |
| # 2. Run our agent workflow from START to END | |
| final_state = self.graph.invoke(initial_state) | |
| # 3. Agent return the generated html file name | |
| return final_state | |
| def _decide_path_node(self, state: AgentState) -> dict: | |
| print("---NODE: LLM DECIDING PATH---") | |
| user_input = state["original_input"] | |
| print(f"Decide Path Input (first 50 chars): '{user_input[:50]}...'") | |
| router_prompt = f""" | |
| You are an intelligent routing agent. Based on the user's request, you must choose the next tool to use. | |
| The user's request is: "{user_input}" | |
| You have the following tools available: | |
| - "wikipedia_search": Choose this if the user is asking a question or wants to find information on a topic (e.g., "What is deep learning?"). Use for general encyclopedia-like knowledge. | |
| - "web_search": Choose this if the user is asking a question about current events, specific facts, or requires broader, more up-to-date internet information. | |
| - "mindmap_generator": Choose this if the user has provided a full block of text that is already self-contained and ready to be turned into a mind map. | |
| Your response MUST be only one of these single words: "wikipedia_search", "web_search", or "mindmap_generator". | |
| """ | |
| llm_choice_response = self.llm.invoke(router_prompt) | |
| raw_llm_response_content = llm_choice_response.content | |
| print(f"LLM Raw Router Response: '{raw_llm_response_content}'") | |
| llm_choice = raw_llm_response_content.strip().lower() | |
| print(f"LLM Processed Router Choice: '{llm_choice}' (length: {len(llm_choice)})") | |
| updates = {} | |
| if "wikipedia_search" == llm_choice: | |
| updates["chosen_path"] = "fetch_from_wiki" | |
| print("Decision: 'wikipedia_search'. Setting chosen_path to 'fetch_from_wiki'.") | |
| elif "web_search" == llm_choice: # <-- NEW CONDITIONAL ROUTE | |
| updates["chosen_path"] = "perform_web_search" | |
| print("Decision: 'web_search'. Setting chosen_path to 'perform_web_search'.") | |
| elif "mindmap_generator" == llm_choice: | |
| updates["chosen_path"] = "extract_keywords" | |
| updates["text_to_map"] = user_input | |
| print("Decision: 'mindmap_generator'. Setting chosen_path to 'extract_keywords' and text_to_map.") | |
| print(f"text_to_map set to (first 50 chars): '{updates['text_to_map'][:50]}...'") | |
| else: | |
| print(f"WARNING: LLM returned unexpected choice: '{llm_choice}'. Defaulting to mindmap_generator.") | |
| updates["chosen_path"] = "extract_keywords" | |
| updates["text_to_map"] = user_input | |
| print(f"text_to_map set to (first 50 chars): '{updates['text_to_map'][:50]}...'") | |
| return updates | |
| def _route_next_step(self, state: AgentState) -> str: | |
| """ | |
| Routes the graph based on the 'chosen_path' set by _decide_path_node. | |
| This function *only* returns a string for conditional routing. | |
| """ | |
| print(f"---ROUTING: Based on chosen_path '{state.get('chosen_path', 'N/A')}'---") | |
| # Ensure chosen_path is present in state; default to 'extract_keywords' if not. | |
| return state.get("chosen_path", "extract_keywords") | |
| def _fetch_from_wiki(self, state: AgentState) -> dict: | |
| """ | |
| This is a WORKER node. It uses a tool to get data. | |
| """ | |
| print("---NODE: FETCHING FROM WIKIPEDIA---") | |
| question = state["original_input"] | |
| fetched_text = wiki_search(question) | |
| return {"text_to_map": fetched_text} | |
| def _perform_web_search(self, state: AgentState) -> dict: | |
| print("---NODE: PERFORMING WEB SEARCH---") | |
| query = state["original_input"] | |
| fetched_text = web_search(query) # Call the actual web_search function | |
| return {"text_to_map": fetched_text} | |
| def extract_keywords(self, state: AgentState) -> str: | |
| """ | |
| Uses an LLM to extract the keywords. | |
| """ | |
| print("---NODE: LLM EXTRACTION ---") | |
| text_to_process = state["text_to_map"] | |
| # 1. This is the prompt for the . | |
| extraction_prompt = f""" | |
| You are an expert at analyzing text and extracting key concepts. | |
| From the following text, please extract the main concepts and rate their importance on a scale from 1 to 10. | |
| Text to analyze: | |
| \"{text_to_process}\" | |
| Your response MUST be only a valid JSON array of objects, where each object has a "concept" key and an "importance" key. Do not include any other text or explanation. For example: | |
| Text to analyze: | |
| \"Machine learning (ML) is a type of artificial intelligence that allows computers to learn from data without being explicitly programmed. It involves using algorithms to analyze large datasets, identify patterns, and make predictions or decisions based on those patterns. The more data an ML model is trained on, the better it becomes at performing its task.\" | |
| Output: | |
| [ | |
| {{ | |
| "concept": "Machine learning (ML)", | |
| "importance": 10 | |
| }}, | |
| {{ | |
| "concept": "Artificial intelligence", | |
| "importance": 9 | |
| }}, | |
| {{ | |
| "concept": "Algorithms", | |
| "importance": 8 | |
| }}, | |
| {{ | |
| "concept": "Data analysis", | |
| "importance": 8 | |
| }}, | |
| {{ | |
| "concept": "Pattern recognition", | |
| "importance": 7 | |
| }}, | |
| {{ | |
| "concept": "Predictions/Decisions", | |
| "importance": 7 | |
| }}, | |
| ] | |
| """ | |
| # 2. Call the LLM with the extraction prompt | |
| llm_extraction_response = self.llm.invoke(extraction_prompt) | |
| raw_llm_content = llm_extraction_response.content | |
| print(f"Raw LLM response: \n'{raw_llm_content}'") | |
| llm_extraction = [] # Initialize as an empty list for safety | |
| try: | |
| # Use regex to find content between ```json and ``` | |
| # This pattern is more robust as it handles potential leading/trailing whitespace | |
| match = re.search(r'```json\s*(.*?)\s*```', raw_llm_content, re.DOTALL) | |
| if match: | |
| json_string = match.group(1) | |
| print(f"Extracted JSON string: \n'{json_string}'") | |
| llm_extraction = json.loads(json_string) | |
| else: | |
| # If no markdown block is found, try parsing the raw content directly | |
| print("No JSON markdown block found. Attempting to parse raw content directly.") | |
| llm_extraction = json.loads(raw_llm_content) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON from LLM: {e}") | |
| print(f"Problematic content (after extraction attempt): '{raw_llm_content}'") | |
| except Exception as e: | |
| print(f"An unexpected error occurred: {e}") | |
| print(f"Problematic content (after extraction attempt): '{raw_llm_content}'") | |
| return {"keywords": llm_extraction} | |
| def connect_keywords(self, state: AgentState) -> dict: | |
| """ | |
| Uses an LLM to identify relationships between the extracted keywords. | |
| """ | |
| print("---NODE: LLM CONNECTING KEYWORDS ---") | |
| keywords_list = state["keywords"] | |
| text_to_map = state["text_to_map"] | |
| if not keywords_list: | |
| print("No keywords found to connect. Returning empty connections.") | |
| return {"connections": []} | |
| # Format keywords for the prompt, so the LLM knows what to connect | |
| # You can choose to pass the original concept list or just the names | |
| # For simplicity, let's just pass the concept names | |
| concept_names = [k["concept"] for k in keywords_list if "concept" in k] | |
| formatted_concepts = "\n- " + "\n- ".join(concept_names) if concept_names else "No concepts provided." | |
| connection_prompt = f""" | |
| You are an expert at identifying semantic relationships between concepts in a text. | |
| Given the following original text and a list of key concepts extracted from it: | |
| Original Text: | |
| \"{text_to_map}\" | |
| Key Concepts: | |
| {formatted_concepts} | |
| Identify all meaningful relationships between these key concepts based *only* on the provided Original Text. | |
| For each relationship, provide a "source" concept, a "relation" (a concise description of the relationship, e.g., "is a type of", "uses", "enables", "is part of"), and a "target" concept. | |
| Your response MUST be only a valid JSON array of objects, where each object has a "source" key (string), a "relation" key (string), and a "target" key (string). Do not include any other text or explanation. | |
| For example: | |
| [ | |
| {{"source": "Machine learning", "relation": "is a type of", "target": "Artificial Intelligence"}}, | |
| {{"source": "ML model", "relation": "is trained on", "target": "Large datasets"}}, | |
| {{"source": "Data analysis", "relation": "identifies", "target": "Patterns"}} | |
| ] | |
| """ | |
| # 2. Call the LLM with the connection prompt | |
| llm_connections_response = self.llm.invoke(connection_prompt) | |
| raw_llm_content = llm_connections_response.content | |
| print(f"Raw LLM response (connections): \n'{raw_llm_content}'") | |
| connections = [] # Initialize as an empty list for safety | |
| try: | |
| # Use regex to find content between ```json and ``` | |
| match = re.search(r'```json\s*(.*?)\s*```', raw_llm_content, re.DOTALL) | |
| if match: | |
| json_string = match.group(1) | |
| print(f"Extracted JSON string (connections): \n'{json_string}'") | |
| connections = json.loads(json_string) | |
| else: | |
| # If no markdown block is found, try parsing the raw content directly | |
| print("No JSON markdown block found for connections. Attempting to parse raw content directly.") | |
| connections = json.loads(raw_llm_content) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON from LLM (connections): {e}") | |
| print(f"Problematic content (connections): '{raw_llm_content}'") | |
| except Exception as e: | |
| print(f"An unexpected error occurred (connections): {e}") | |
| print(f"Problematic content (connections): '{raw_llm_content}'") | |
| return {"connections": connections} | |
| def summarize_map(self, state: AgentState) -> dict: | |
| print("---NODE: SUMMARIZING MAP---") | |
| keywords = state["keywords"] | |
| connections = state["connections"] | |
| original_text = state["original_input"] | |
| if not keywords and not connections: | |
| print("No graph data to summarize. Returning empty summary.") | |
| return {"summary": "No conceptual map data was generated."} | |
| formatted_keywords = "\n".join([f"- {k['concept']} (Importance: {k['importance']})" for k in keywords]) | |
| formatted_connections = "\n".join([f"- {c['source']} {c['relation']} {c['target']}" for c in connections]) | |
| summary_prompt = f""" | |
| You are an expert at explaining complex information. | |
| Based on the following original text, extracted key concepts, and their relationships, provide a concise summary that explains the core topic and how these concepts are interconnected. | |
| Original Text (for context): | |
| \"{original_text}\" | |
| Key Concepts: | |
| {formatted_keywords} | |
| Identified Relationships: | |
| {formatted_connections} | |
| Focus on explaining the conceptual map. Start with the main topic and then describe the key concepts and their relationships, in a coherent paragraph. | |
| """ | |
| summary_response = self.llm.invoke(summary_prompt) | |
| print(f"Generated Summary: \n'{summary_response.content}'") | |
| return {"summary": summary_response.content} | |
| def generate_graph_image(self, state: AgentState) -> dict: | |
| """ | |
| Generates an interactive and visually appealing conceptual map using pyvis. | |
| The output is an HTML string that can be rendered directly in Gradio. | |
| This function now returns the full HTML content in the 'image_path' key. | |
| """ | |
| print("---NODE: GENERATING INTERACTIVE GRAPH (pyvis) ---") | |
| keywords = state["keywords"] | |
| connections = state["connections"] | |
| output_filename = "concept_map.html" # Temporary file to generate HTML | |
| if not keywords and not connections: | |
| print("No keywords or connections to generate a graph. Skipping image generation.") | |
| # Return an empty HTML paragraph for Gradio to display | |
| return {"image_path": "<p>No graph data could be generated from the text.</p>"} | |
| # 1. Create a NetworkX graph to hold the data structure (same as before) | |
| G = nx.DiGraph() | |
| # Add nodes with attributes that pyvis can use, like 'size' and 'title' (for hover text) | |
| for keyword in keywords: | |
| concept = keyword.get("concept") | |
| importance = keyword.get("importance", 5) | |
| if concept: | |
| G.add_node( | |
| concept, | |
| size=15 + (importance * 2), # Calculate node size based on importance | |
| title=f"Importance: {importance}", # This creates a tooltip on hover | |
| importance_val=importance # Store for coloring later | |
| ) | |
| # Add edges with labels (same as before) | |
| for conn in connections: | |
| source = conn.get("source") | |
| target = conn.get("target") | |
| relation = conn.get("relation", "") | |
| if source in G and target in G: | |
| G.add_edge(source, target, label=relation) | |
| # 2. Initialize a pyvis Network for visualization | |
| # Set a specific height, a dark background, and white font for a modern look. | |
| net = Network(height="750px", width="100%", bgcolor="#222222", font_color="white", directed=True) | |
| # 3. Transfer the graph structure from NetworkX to pyvis | |
| net.from_nx(G) | |
| # 4. Apply custom styling to nodes after they've been added to the pyvis net | |
| for node in net.nodes: | |
| importance = node.get("importance_val", 5) | |
| if importance >= 9: | |
| node["color"] = "#ff4757" # Red for highest importance | |
| elif importance >= 7: | |
| node["color"] = "#ffa502" # Orange for high importance | |
| elif importance >= 5: | |
| node["color"] = "#2ed573" # Green for medium importance | |
| else: | |
| node["color"] = "#1e90ff" # Blue for lower importance | |
| # 5. Add physics layout options for a more dynamic and readable graph | |
| # These settings prevent nodes from overlapping and create a nice "springy" effect. | |
| net.set_options(""" | |
| var options = { | |
| "physics": { | |
| "repulsion": { | |
| "centralGravity": 0.2, | |
| "springLength": 100, | |
| "springConstant": 0.05, | |
| "nodeDistance": 150, | |
| "damping": 0.09 | |
| }, | |
| "maxVelocity": 50, | |
| "minVelocity": 0.1, | |
| "solver": "repulsion" | |
| } | |
| } | |
| """) | |
| style_tag = """ | |
| <style> | |
| body, html { | |
| margin: 0; | |
| padding: 0; | |
| width: 100%; | |
| height: 100%; | |
| overflow: hidden; /* This is the key property to hide scrollbars */ | |
| } | |
| #mynetwork { | |
| width: 100%; | |
| height: 100%; | |
| } | |
| </style> | |
| """ | |
| net.html = net.html.replace("</head>", style_tag + "</head>") | |
| # 6. Generate the HTML and return its content as a string | |
| try: | |
| # save_graph writes the complete HTML structure to a file | |
| net.save_graph(output_filename) | |
| # Read the content of the generated file | |
| with open(output_filename, 'r', encoding='utf-8') as f: | |
| html_content = f.read() | |
| print(f"Interactive conceptual map HTML generated successfully.") | |
| # The Gradio gr.HTML component can directly render this string | |
| return {"image_path": html_content, "download_path": output_filename} | |
| except Exception as e: | |
| print(f"Error generating graph image with pyvis: {e}") | |
| return {"image_path": f"<p>An error occurred while generating the graph: {e}</p>"} |