| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | from docling.document_converter import DocumentConverter |
| | import tqdm as notebook_tqdm |
| | from pydantic import BaseModel, Field |
| | import os |
| | from typing import Optional, Any, Literal, Dict, List, Tuple, Type, Annotated |
| | from operator import add |
| | from typing_extensions import TypedDict |
| | from langgraph.graph import StateGraph, START, END |
| | from langgraph.types import Command |
| | from langchain_openai import ChatOpenAI |
| | from langchain_google_genai import ChatGoogleGenerativeAI |
| | from langchain_core.prompts import ChatPromptTemplate |
| | from langchain_core.output_parsers import PydanticOutputParser |
| | |
| | import gradio as gr |
| | import contextlib |
| | from io import StringIO |
| | import docx |
| | from pathlib import Path |
| | import re |
| | from typing import Union |
| | from dotenv import load_dotenv |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | USE_GOOGLE = False |
| | API_KEY = os.environ.get("NEBIUS_KEY") |
| | MODEL_NAME = None |
| | ENDPOINT_URL = None |
| |
|
| | |
| | NEBIUS_MODELS = [ |
| | "meta-llama/Llama-2-7b-chat-hf", |
| | "mistralai/Mistral-7B-Instruct-v0.2", |
| | "microsoft/DialoGPT-medium", |
| | "openai/gpt-3.5-turbo", |
| | "Qwen2.5-Coder-7B", |
| | "QwQ-32B" |
| | ] |
| |
|
| | def list_nebius_models(): |
| | """List all available models from Nebius API.""" |
| | try: |
| | import requests |
| | |
| | headers = { |
| | "Authorization": f"Bearer {API_KEY}", |
| | "Content-Type": "application/json" |
| | } |
| | |
| | |
| | response = requests.get( |
| | f"{ENDPOINT_URL}models", |
| | headers=headers, |
| | timeout=10 |
| | ) |
| | |
| | if response.status_code == 200: |
| | models = response.json() |
| | print("Available models:") |
| | for model in models.get('data', []): |
| | print(f" - {model.get('id', 'Unknown')}") |
| | return [model.get('id') for model in models.get('data', [])] |
| | else: |
| | print(f"Failed to fetch models: {response.status_code}") |
| | print(f"Response: {response.text}") |
| | return [] |
| | |
| | except Exception as e: |
| | print(f"Error fetching models: {str(e)}") |
| | return [] |
| |
|
| | def test_available_models(): |
| | """Test which models are actually available.""" |
| | |
| | |
| | available_models = list_nebius_models() |
| | |
| | if available_models: |
| | print(f"Found {len(available_models)} models from API") |
| | test_models = available_models[:6] |
| | else: |
| | |
| | test_models = [ |
| | "gpt-3.5-turbo", |
| | "gpt-4", |
| | "claude-3-haiku", |
| | "llama-2-7b-chat", |
| | "mistral-7b-instruct", |
| | "qwen-7b-chat" |
| | ] |
| | |
| | for model in test_models: |
| | try: |
| | print(f"Testing model: {model}") |
| | global MODEL_NAME |
| | MODEL_NAME = model |
| | |
| | |
| | llm = ChatOpenAI( |
| | model=model, |
| | api_key=API_KEY, |
| | base_url=ENDPOINT_URL, |
| | max_completion_tokens=50, |
| | timeout=10, |
| | temperature=0 |
| | ) |
| | |
| | response = llm.invoke("Hello") |
| | print(f"β
{model} works!") |
| | return model |
| | |
| | except Exception as e: |
| | print(f"β {model} failed: {str(e)}") |
| | continue |
| | |
| | print("β οΈ No working models found") |
| | return None |
| |
|
| | |
| | def setup_api_key(nebius_key=None, model_name=None): |
| | global API_KEY, MODEL_NAME, ENDPOINT_URL, USE_GOOGLE |
| | |
| | |
| | if nebius_key: |
| | API_KEY = nebius_key |
| | ENDPOINT_URL = "https://api.studio.nebius.com/v1/" |
| | |
| | |
| | if model_name: |
| | MODEL_NAME = model_name |
| | else: |
| | working_model = test_available_models() |
| | if working_model: |
| | MODEL_NAME = working_model |
| | else: |
| | print("No working models found") |
| | return False |
| | |
| | print(f"Using user-provided Nebius API key with model: {MODEL_NAME}") |
| | return True |
| | |
| | |
| | if API_KEY: |
| | ENDPOINT_URL = "https://api.studio.nebius.com/v1/" |
| | |
| | if model_name: |
| | MODEL_NAME = model_name |
| | else: |
| | working_model = test_available_models() |
| | if working_model: |
| | MODEL_NAME = working_model |
| | else: |
| | print("No working models found") |
| | return False |
| | |
| | print(f"Using Nebius API from environment variable with model: {MODEL_NAME}") |
| | return True |
| | |
| | print("No API key found. Please provide a Nebius API key.") |
| | return False |
| |
|
| | |
| | setup_api_key() |
| |
|
| | |
| | |
| |
|
| | |
| | class ResearchSummary(BaseModel): |
| | key_findings: List[str] = Field(..., description="A list of the most important findings from the research paper.") |
| | methodology: str = Field(..., description="A brief description of the methodology used in the research.") |
| | limitations: List[str] = Field(..., description="A list of the limitations of the study as identified by the authors or the agent.") |
| |
|
| | class FutureScope(BaseModel): |
| | identified_gaps: List[str] = Field(..., description="List of identified research gaps based on the provided paper(s).") |
| | suggested_directions: List[str] = Field(..., description="Concrete suggestions for future research directions or next studies.") |
| | synthesis: str = Field(..., description="A brief synthesis of how these future directions build upon the provided literature.") |
| |
|
| | class MultiStepPlan(BaseModel): |
| | reasoning : str = Field("", description="The multi-step reasoning required to break down the user query in a plan.") |
| | plan : List[Literal["summary_agent", "synthesis_agent", "future_scope_agent", "critique_agent"]] = Field("END", description="The list of agents required to fulfill the user request determined by the Orchestrator.") |
| |
|
| | class PaperSummary(BaseModel): |
| | key_findings: List[str] = Field( |
| | default_factory=lambda: ["No key findings available due to processing error"], |
| | description="List of key findings from the paper" |
| | ) |
| | methodology: str = Field( |
| | default="Methodology not available due to processing error", |
| | description="Summary of the methodology used in the paper" |
| | ) |
| | conclusion: str = Field( |
| | default="Conclusion not available due to processing error", |
| | description="Summary of the paper's conclusion" |
| | ) |
| |
|
| | |
| | |
| |
|
| | |
| | class AgentDescription(TypedDict): |
| | "Agent description containing the title, system prompt and description." |
| | title : str |
| | description : str |
| | system_prompt : str |
| |
|
| | class ResearchAgentState(BaseModel): |
| | """State for the research agent.""" |
| | research_papers: Annotated[List[Tuple[str, str]], add] = Field(default_factory=list) |
| | summary: Annotated[List[Dict], add] = Field(default_factory=list) |
| | user_query: str = Field(default="") |
| | phase: str = Field(default="PLAN") |
| | plan: List[str] = Field(default_factory=list) |
| | messages: Annotated[List[Tuple[str, str]], add] = Field(default_factory=list) |
| | critique: Optional[str] = Field(default=None) |
| | available_agents: Dict[str, Dict] = Field(default_factory=dict) |
| | final_answer: Optional[str] = Field(default=None) |
| | max_iterations: int = Field(default=1) |
| | synthesis_of_findings: Optional[str] = Field(default=None) |
| | future_directions_report: Optional[str] = Field(default=None) |
| |
|
| | |
| | |
| |
|
| | |
| | general_prefix = """ |
| | You are part of a collaborative multi-agent system called the *Academic Research Assistant*. |
| | This system consists of specialized agents working together to analyze, synthesize, and critique academic literature. |
| | Each agent has a distinct role. You are encouraged to build upon the work of other agents to produce a comprehensive and insightful analysis. |
| | """ |
| |
|
| | summary_prompt = """ |
| | You are a diligent research assistant. Your task is to read the provided research paper and extract the most critical information. |
| | |
| | Focus on the following key areas: |
| | 1. **Key Findings:** What were the main results and conclusions of the study? List them as clear, concise bullet points. |
| | 2. **Methodology:** Briefly describe the methodology, including the techniques, dataset, and experiments conducted. |
| | 3. **Limitations:** Identify any limitations of the study that were mentioned by the authors. |
| | |
| | Provide your output in a structured format. Do not add any interpretation; stick strictly to the information present in the paper. |
| | """ |
| |
|
| | synthesis_prompt = """ |
| | You are a research analyst specializing in literature reviews. You have been provided with summaries from one or more research papers. |
| | |
| | Your task is to synthesize this information into a cohesive narrative. |
| | 1. Identify common themes, findings, and methodologies across the papers. |
| | 2. Highlight any conflicting or divergent results. |
| | 3. Create a single, flowing text that summarizes the current state of research based *only* on the provided information. |
| | |
| | Do not introduce outside knowledge. Your synthesis should serve as a high-level overview for someone trying to understand the field as defined by these papers. |
| | """ |
| |
|
| | future_scope_prompt = """ |
| | You are an experienced academic advisor with a knack for identifying promising research avenues. |
| | Based on the provided research summaries and synthesis, your goal is to propose a clear path for future work. |
| | |
| | Follow these steps: |
| | 1. **Identify Research Gaps:** Based on the limitations and findings of the papers, what questions remain unanswered? What are the clear gaps in the current body of knowledge? |
| | 2. **Suggest Future Directions:** Propose 2-3 concrete, actionable research projects that could address these gaps. For each suggestion, briefly explain: |
| | - The research question. |
| | - A potential methodology. |
| | - The expected contribution to the field. |
| | 3. **Write a Concluding Synthesis:** Briefly summarize why these future directions are a logical and important next step in this research area. |
| | |
| | Your tone should be formal and academic. The suggestions must be directly inspired by the provided context. |
| | """ |
| |
|
| | critique_prompt = """ |
| | You are a peer reviewer. Your task is to provide constructive feedback on the generated research analysis (synthesis and future scope). |
| | |
| | Evaluate the analysis based on the following criteria: |
| | - **Clarity and Cohesion:** Is the synthesis clear, well-structured, and easy to understand? |
| | - **Logical Flow:** Do the suggested future directions logically follow from the identified gaps and the provided literature? |
| | - **Actionability:** Are the future scope suggestions concrete and specific enough to be pursued? |
| | - **Completeness:** Does the analysis seem to have missed any obvious connections or gaps present in the source material? |
| | |
| | Provide brief, constructive feedback highlighting points of improvement. |
| | Provide a quality flag: |
| | - **EXCELLENT**: The analysis is clear, logical, and insightful. |
| | - **NEEDS REVISION**: The analysis has flaws in logic, clarity, or completeness that need to be addressed. |
| | """ |
| |
|
| | |
| | |
| |
|
| | |
| | summary_agent_description = AgentDescription( |
| | title="summary_agent", |
| | description="Summarizes the key findings, methodology, and limitations of a single research paper.", |
| | system_prompt=general_prefix + summary_prompt |
| | ) |
| | synthesis_agent_description = AgentDescription( |
| | title="synthesis_agent", |
| | description="Synthesizes information from multiple paper summaries into a cohesive literature review.", |
| | system_prompt=general_prefix + synthesis_prompt |
| | ) |
| | future_scope_agent_description = AgentDescription( |
| | title="future_scope_agent", |
| | description="Identifies research gaps and suggests concrete directions for future work based on the literature.", |
| | system_prompt=general_prefix + future_scope_prompt |
| | ) |
| | critique_agent_description = AgentDescription( |
| | title="critique_agent", |
| | description="Provides peer-review style feedback on the generated synthesis and future scope analysis.", |
| | system_prompt=general_prefix + critique_prompt |
| | ) |
| |
|
| | available_agents = { |
| | "summary_agent": summary_agent_description, |
| | "synthesis_agent": synthesis_agent_description, |
| | "future_scope_agent": future_scope_agent_description, |
| | "critique_agent": critique_agent_description, |
| | } |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | def read_file_content(file: Union[str, Path]) -> str: |
| | file_path = Path(file) |
| | suffix = file_path.suffix.lower() |
| |
|
| | if suffix == ".txt" or suffix == ".md": |
| | return file_path.read_text(encoding="utf-8") |
| | |
| | elif suffix == ".pdf": |
| | converter = DocumentConverter() |
| | result = converter.convert(file_path) |
| | return result.document.export_to_markdown() |
| |
|
| | elif suffix == ".docx": |
| | return "\n".join(p.text for p in docx.Document(file_path).paragraphs) |
| |
|
| | else: |
| | return "" |
| |
|
| | |
| | |
| |
|
| | |
| | def call_llm(system_prompt, user_prompt, response_format=None): |
| | """Call LLM with system and user prompt, optionally parsing to a specific format""" |
| | global API_KEY, MODEL_NAME, ENDPOINT_URL |
| | |
| | if not API_KEY: |
| | print("Error: API key is not set") |
| | |
| | if response_format and hasattr(response_format, "__name__"): |
| | try: |
| | if response_format.__name__ == "MultiStepPlan": |
| | return MultiStepPlan( |
| | reasoning="Error occurred: API key not set", |
| | plan=["summary_agent", "synthesis_agent", "future_scope_agent"] |
| | ) |
| | elif response_format.__name__ == "PaperSummary": |
| | return PaperSummary() |
| | else: |
| | |
| | return response_format() |
| | except Exception as e: |
| | print(f"Failed to create default instance: {str(e)}") |
| | return None |
| | |
| | try: |
| | if USE_GOOGLE: |
| | llm = ChatGoogleGenerativeAI( |
| | model=MODEL_NAME, |
| | google_api_key=API_KEY, |
| | temperature=0 |
| | ) |
| | else: |
| | llm = ChatOpenAI( |
| | model=MODEL_NAME, |
| | api_key=API_KEY, |
| | base_url=ENDPOINT_URL, |
| | max_completion_tokens=None, |
| | timeout=60, |
| | max_retries=2, |
| | temperature=0 |
| | ) |
| | |
| | if response_format is not None: |
| | llm = llm.with_structured_output(response_format) |
| |
|
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", "{system_prompt}"), |
| | ("user", "{user_prompt}") |
| | ]) |
| |
|
| | chain = prompt | llm |
| |
|
| | print(f"Calling model: {MODEL_NAME}") |
| | response = chain.invoke({ |
| | "system_prompt": system_prompt, |
| | "user_prompt": user_prompt |
| | }) |
| |
|
| | return response |
| | |
| | except Exception as e: |
| | print(f"Error in call_llm: {str(e)}") |
| | if hasattr(e, 'response') and hasattr(e.response, 'json'): |
| | try: |
| | error_details = e.response.json() |
| | print(f"API Error details: {error_details}") |
| | except: |
| | pass |
| | |
| | |
| | if response_format and hasattr(response_format, "__name__"): |
| | try: |
| | if response_format.__name__ == "MultiStepPlan": |
| | return MultiStepPlan( |
| | reasoning="Error occurred while calling the LLM API. Using default plan.", |
| | plan=["summary_agent", "synthesis_agent", "future_scope_agent"] |
| | ) |
| | elif response_format.__name__ == "PaperSummary": |
| | return PaperSummary() |
| | else: |
| | |
| | return response_format() |
| | except Exception as e: |
| | print(f"Failed to create default instance: {str(e)}") |
| | return None |
| | def serialize_messages(messages : List[Tuple[str,str]]) -> str: |
| | "Returns a formatted message history of previous messages" |
| | return "\n" +"\n".join(f"**{role}:**\n{content}" for role, content in messages) |
| |
|
| | def strip_think_blocks(text: str) -> str: |
| | return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL) |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | def type_conversion(obj : Any, type): |
| | "Return the object in a gradio compatible type" |
| | if isinstance(obj, type): |
| | result_dict = obj.model_dump() |
| | elif isinstance(obj, Dict): |
| | result_dict = obj |
| | else: |
| | |
| | try: |
| | result_dict = ResearchAgentState.model_validate(obj).model_dump() |
| | except Exception as e: |
| | print(f"Error converting output of type {type(obj)}") |
| |
|
| | return result_dict |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | def orchestrator_agent(state: ResearchAgentState) -> Command: |
| | """Central orchestration logic to determine the next agent to call.""" |
| | |
| | if not state.research_papers: |
| | return Command( |
| | goto=END, |
| | update={"final_answer": "### βοΈ The research assistant needs at least one research paper to begin.\n" \ |
| | "ππ½ Please upload one or more research papers in the 'π Research Materials' tab."} |
| | ) |
| | |
| | if state.phase == "PLAN": |
| | agent_descriptions = "\n".join([ |
| | f"**{agent.get('title')}**: {agent.get('description')}" |
| | for name, agent in state.available_agents.items() |
| | ]) |
| |
|
| | system_prompt = f"""You are an orchestrator for an academic research assistant. Your task is to create a plan to answer the user's query using a team of specialized agents. |
| | |
| | **Agents:** |
| | {agent_descriptions} |
| | |
| | Based on the user's query, create a logical sequence of agents to call. For example, to find future scope, you should first summarize the papers, then synthesize them, and then call the future_scope_agent. |
| | |
| | **IMPORTANT:** Always include the summary_agent as the first step when working with research papers. Every task requires proper paper summaries before analysis can begin. |
| | """ |
| |
|
| | user_prompt = state.user_query |
| |
|
| | response = call_llm(system_prompt, user_prompt, MultiStepPlan) |
| | |
| | |
| | if response is None: |
| | print("β οΈ Failed to get response from LLM. Using default plan.") |
| | plan = ["summary_agent", "synthesis_agent", "future_scope_agent"] |
| | print("="*40) |
| | print("π€ DEFAULT ORCHESTRATOR PLAN (LLM call failed)") |
| | print("="*40) |
| | print("\nπ Reasoning: Default plan due to LLM call failure\n") |
| | print("π Planned Steps:") |
| | for i, step in enumerate(plan, 1): |
| | print(f" {i}. {step}") |
| | print("="*40) |
| | print("βοΈ EXECUTE PLAN") |
| | print("="*40 + "\n") |
| | |
| | |
| | updates = { |
| | "plan": plan, |
| | "phase": "EXECUTE" |
| | } |
| | |
| | |
| | if not any(msg[0] == "user_query" for msg in state.messages): |
| | updates["messages"] = [("user_query", state.user_query)] |
| | |
| | return Command(goto=plan[0], update=updates) |
| | |
| | |
| | try: |
| | |
| | if not hasattr(response, 'plan') or not response.plan: |
| | print("β οΈ Response from LLM did not contain a valid plan. Using default plan.") |
| | response.plan = ["summary_agent", "synthesis_agent", "future_scope_agent"] |
| | elif response.plan[0] != "summary_agent": |
| | print("β οΈ Enforcing summary_agent as first step in the plan") |
| | response.plan.insert(0, "summary_agent") |
| | |
| | print("="*40) |
| | print("π€ ORCHESTRATOR PLAN") |
| | print("="*40) |
| | print(f"\nπ Reasoning:\n{getattr(response, 'reasoning', 'No reasoning provided')}\n") |
| | print("π Planned Steps:") |
| | for i, step in enumerate(response.plan, 1): |
| | print(f" {i}. {step}") |
| | print("="*40) |
| | print("βοΈ EXECUTE PLAN") |
| | print("="*40 + "\n") |
| | |
| | |
| | updates = { |
| | "plan": response.plan, |
| | "phase": "EXECUTE" |
| | } |
| | |
| | |
| | if not any(msg[0] == "user_query" for msg in state.messages): |
| | updates["messages"] = [("user_query", state.user_query)] |
| | |
| | return Command(goto=response.plan[0], update=updates) |
| | |
| | except Exception as e: |
| | |
| | print(f"β οΈ Error processing LLM response: {str(e)}. Using default plan.") |
| | plan = ["summary_agent", "synthesis_agent", "future_scope_agent"] |
| | |
| | |
| | updates = { |
| | "plan": plan, |
| | "phase": "EXECUTE" |
| | } |
| | |
| | |
| | if not any(msg[0] == "user_query" for msg in state.messages): |
| | updates["messages"] = [("user_query", state.user_query)] |
| | |
| | return Command(goto=plan[0], update=updates) |
| | |
| | if len(state.plan) == 0 and state.phase == "EXECUTE": |
| | return Command( |
| | goto="final_answer_tool", |
| | update={"phase": "ANSWER"} |
| | ) |
| | |
| | if state.phase == "EXECUTE": |
| | next_agent = state.plan[0] |
| | remaining_plan = state.plan[1:] |
| | return Command( |
| | goto=next_agent, |
| | update={"plan": remaining_plan} |
| | ) |
| |
|
| | if state.phase == "ANSWER": |
| | return Command( |
| | goto=END, |
| | update={ |
| | "phase": "PLAN", |
| | "messages": [("orchestrator_agent", f"\n{state.final_answer}")] |
| | } |
| | ) |
| | |
| | return Command(goto=END, update={}) |
| |
|
| | |
| | |
| |
|
| | |
| | def summary_agent(state : ResearchAgentState) -> Command: |
| | """Creates concise, structured summaries of research papers.""" |
| | |
| | if not state.summary: |
| | |
| | print("The summary agent is processing the papers... π") |
| | research_findings = [] |
| | |
| | for filename, content in state.research_papers: |
| | |
| | system_prompt = f"""You are a research summarization expert. Please read the provided research paper content and create a clear, concise, and structured summary. |
| | Focus on extracting key findings, methodology, and conclusions. |
| | """ |
| | user_prompt = f""" |
| | Paper: {filename} |
| | |
| | Content: |
| | {content[:5000]} # Use first 5000 chars to avoid context limits |
| | |
| | Please provide a structured summary with key findings, methodology, and conclusions. |
| | """ |
| | |
| | response = call_llm(system_prompt, user_prompt, PaperSummary) |
| | |
| | |
| | if response is None: |
| | print(f"β οΈ Failed to summarize paper {filename}. Creating default summary.") |
| | |
| | finding = { |
| | "title": filename, |
| | "key_findings": ["Error: Could not summarize this paper due to API issues."], |
| | "methodology": "Not available due to API error", |
| | "conclusion": "Not available due to API error", |
| | "source": filename |
| | } |
| | research_findings.append(finding) |
| | else: |
| | try: |
| | |
| | finding = { |
| | "title": filename, |
| | "key_findings": response.key_findings if hasattr(response, 'key_findings') else ["No key findings extracted"], |
| | "methodology": response.methodology if hasattr(response, 'methodology') else "Not provided", |
| | "conclusion": response.conclusion if hasattr(response, 'conclusion') else "Not provided", |
| | "source": filename |
| | } |
| | research_findings.append(finding) |
| | except Exception as e: |
| | print(f"β οΈ Error processing summary for {filename}: {str(e)}") |
| | finding = { |
| | "title": filename, |
| | "key_findings": ["Error processing paper summary."], |
| | "methodology": "Error in processing", |
| | "conclusion": "Error in processing", |
| | "source": filename |
| | } |
| | research_findings.append(finding) |
| | |
| | print("Paper summaries complete.") |
| | |
| | |
| | formatted_summaries = [] |
| | for paper in research_findings: |
| | findings_text = "\n".join([f"- {finding}" for finding in paper['key_findings']]) |
| | formatted_summary = f""" |
| | ## {paper['title']} |
| | |
| | ### Key Findings: |
| | {findings_text} |
| | |
| | ### Methodology: |
| | {paper['methodology']} |
| | |
| | ### Conclusion: |
| | {paper['conclusion']} |
| | """ |
| | formatted_summaries.append(formatted_summary) |
| | |
| | combined_summary = "\n\n".join(formatted_summaries) |
| | |
| | agent_contribution = ("summary_agent", combined_summary) |
| | |
| | |
| | return Command( |
| | goto="orchestrator_agent", |
| | update={ |
| | "summary": research_findings, |
| | "messages": [agent_contribution] |
| | } |
| | ) |
| | else: |
| | |
| | return Command(goto="orchestrator_agent", update=state) |
| | def synthesis_agent(state : ResearchAgentState) -> Command: |
| | """Synthesizes the summaries into a cohesive narrative.""" |
| | |
| | agent_description = state.available_agents.get("synthesis_agent", {}) |
| | system_prompt = agent_description.get("system_prompt") |
| | |
| | previous_messages = serialize_messages(state.messages) |
| | user_prompt = f"Please synthesize the following research summaries:\n{previous_messages}" |
| | |
| | print("The synthesis agent is creating a literature review...") |
| | response = call_llm(system_prompt, user_prompt) |
| | |
| | |
| | if response is None: |
| | response_text = "Error: Could not generate synthesis due to API issues." |
| | print("β οΈ Synthesis agent failed - using default response") |
| | else: |
| | response_text = response.content if hasattr(response, 'content') else str(response) |
| | |
| | print("Synthesis complete.") |
| |
|
| | |
| | return Command( |
| | goto="orchestrator_agent", |
| | update={ |
| | "messages": [("synthesis_agent", response_text)] |
| | } |
| | ) |
| |
|
| | def future_scope_agent(state : ResearchAgentState) -> Command: |
| | """Identifies research gaps and suggests future work.""" |
| | |
| | agent_description = state.available_agents.get("future_scope_agent", {}) |
| | system_prompt = agent_description.get("system_prompt") |
| | |
| | previous_messages = serialize_messages(state.messages) |
| | user_prompt = f"Based on the following literature analysis, please identify gaps and suggest future research directions:\n{previous_messages}" |
| |
|
| | print("The future scope agent is identifying research gaps...") |
| | response = call_llm(system_prompt, user_prompt, FutureScope) |
| | |
| | |
| | if response is None: |
| | print("β οΈ Future scope agent failed - using default response") |
| | report_text = "### Identified Research Gaps\n- Error: Could not identify gaps due to API issues.\n\n### Suggested Future Directions\n- Error: Could not suggest directions due to API issues.\n\n### Concluding Synthesis\nError: Could not generate synthesis due to API issues." |
| | else: |
| | try: |
| | report_text = "### Identified Research Gaps\n" |
| | for gap in response.identified_gaps: |
| | report_text += f"- {gap}\n" |
| | report_text += "\n### Suggested Future Directions\n" |
| | for direction in response.suggested_directions: |
| | report_text += f"- {direction}\n" |
| | report_text += f"\n### Concluding Synthesis\n{response.synthesis}" |
| | except Exception as e: |
| | print(f"β οΈ Error processing future scope response: {str(e)}") |
| | report_text = "### Error\nCould not process future scope analysis due to response format issues." |
| |
|
| | print("Future scope analysis complete.") |
| | |
| | |
| | return Command( |
| | goto="orchestrator_agent", |
| | update={ |
| | "messages": [("future_scope_agent", report_text)] |
| | } |
| | ) |
| |
|
| | def critique_agent(state: ResearchAgentState) -> Command: |
| | """Provides feedback on the generated analysis.""" |
| |
|
| | agent_description = state.available_agents.get("critique_agent", {}) |
| | system_prompt = agent_description.get("system_prompt") |
| |
|
| | previous_messages = serialize_messages(state.messages) |
| | user_prompt = f"Please critique the following research analysis:\n{previous_messages}" |
| | |
| | print("The critique agent is reviewing the analysis... π") |
| | response = call_llm(system_prompt, user_prompt) |
| | |
| | |
| | if response is None: |
| | response_text = "Error: Could not generate critique due to API issues." |
| | print("β οΈ Critique agent failed - using default response") |
| | else: |
| | response_text = response.content if hasattr(response, 'content') else str(response) |
| | |
| | print("Critique complete.") |
| | |
| | |
| | return Command( |
| | goto="orchestrator_agent", |
| | update={ |
| | "critique": response_text, |
| | "messages": [("critique_agent", response_text)] |
| | } |
| | ) |
| | def final_answer_tool(state : ResearchAgentState) -> Command[Literal["orchestrator_agent"]]: |
| | "Final answer tool is invoked to formulate a final answer based on the agent message history" |
| |
|
| | system_prompt = f""" |
| | You're a helpful research assistant and your role is to provide a concise final answer with all the relevant details to answer the user query, based on the provided agent message history. |
| | |
| | Structure your response clearly. Use markdown headings for different sections (e.g., ## Synthesized Findings, ## Future Research Directions). |
| | """ |
| |
|
| | formatted_history = serialize_messages(state.messages) |
| | |
| | user_prompt = f""" |
| | --- |
| | **Original Task:** |
| | {state.user_query} |
| | --- |
| | **Agent Execution History:** |
| | {formatted_history} |
| | --- |
| | Compile the final, comprehensive answer for the user based on the history. |
| | """ |
| |
|
| | response = call_llm(system_prompt, user_prompt) |
| | |
| | |
| | if response is None: |
| | final_answer = "Error: Could not generate final answer due to API issues. Please check the logs and try again." |
| | print("β οΈ Final answer tool failed - using default response") |
| | else: |
| | final_answer = response.content if hasattr(response, 'content') else str(response) |
| |
|
| | if isinstance(final_answer, str): |
| | final_answer = strip_think_blocks(final_answer) |
| | |
| | |
| | return Command( |
| | goto="orchestrator_agent", |
| | update={"final_answer": final_answer} |
| | ) |
| |
|
| |
|
| | |
| | |
| |
|
| | |
| | def init_state(): |
| | """Initialize the state with default values.""" |
| | return ResearchAgentState(available_agents=available_agents) |
| |
|
| | graph = StateGraph(ResearchAgentState) |
| | graph.add_node("orchestrator_agent", orchestrator_agent) |
| | graph.add_node("summary_agent", summary_agent) |
| | graph.add_node("synthesis_agent", synthesis_agent) |
| | graph.add_node("future_scope_agent", future_scope_agent) |
| | graph.add_node("critique_agent", critique_agent) |
| | graph.add_node("final_answer_tool", final_answer_tool) |
| |
|
| | |
| | graph.add_edge(START, "orchestrator_agent") |
| |
|
| | |
| | graph.add_conditional_edges( |
| | "orchestrator_agent", |
| | lambda state: ( |
| | state.plan[0] if state.phase == "EXECUTE" and state.plan |
| | else "final_answer_tool" if state.phase == "ANSWER" |
| | else END |
| | ) |
| | ) |
| |
|
| | graph.add_edge("summary_agent", "orchestrator_agent") |
| | graph.add_edge("synthesis_agent", "orchestrator_agent") |
| | graph.add_edge("future_scope_agent", "orchestrator_agent") |
| | graph.add_edge("critique_agent", "orchestrator_agent") |
| | graph.add_edge("final_answer_tool", "orchestrator_agent") |
| |
|
| | |
| | graph = graph.compile() |
| |
|
| | |
| | |
| |
|
| | |
| | def extract_research_papers( |
| | state_dict, |
| | paper_files, |
| | max_iterations: int |
| | ) -> tuple[str, Dict, bool]: |
| | """Extract text from research papers and update state.""" |
| | |
| | |
| | if isinstance(state_dict, dict): |
| | state = ResearchAgentState(**state_dict) |
| | else: |
| | state = ResearchAgentState() |
| | |
| | |
| | state.max_iterations = max_iterations |
| | |
| | if not paper_files: |
| | return "Please upload at least one research paper to analyze.", state.model_dump(), False |
| | |
| | console_output = StringIO() |
| | with contextlib.redirect_stdout(console_output): |
| | papers = [] |
| | |
| | for file in paper_files: |
| | try: |
| | filename = file.name.split("/")[-1] |
| | print(f"π Processing {filename}...") |
| | |
| | if filename.lower().endswith(".pdf"): |
| | |
| | try: |
| | converter = DocumentConverter() |
| | |
| | if hasattr(converter, 'pdf_to_text'): |
| | content = converter.pdf_to_text(file.name) |
| | elif hasattr(converter, 'extract_text'): |
| | content = converter.extract_text(file.name) |
| | else: |
| | |
| | import PyPDF2 |
| | content = "" |
| | with open(file.name, 'rb') as pdf_file: |
| | pdf_reader = PyPDF2.PdfReader(pdf_file) |
| | for page_num in range(len(pdf_reader.pages)): |
| | content += pdf_reader.pages[page_num].extract_text() |
| | except ImportError: |
| | print("β οΈ PDF conversion libraries not available. Please install PyPDF2.") |
| | continue |
| | |
| | elif filename.lower().endswith(".docx"): |
| | doc = docx.Document(file.name) |
| | content = "\n".join([p.text for p in doc.paragraphs]) |
| | elif filename.lower().endswith((".txt", ".md")): |
| | with open(file.name, "r") as f: |
| | content = f.read() |
| | else: |
| | print(f"β οΈ Unsupported file format: {filename}") |
| | continue |
| | |
| | papers.append((filename, content)) |
| | print(f"β
Successfully extracted {len(content)} characters from {filename}") |
| | |
| | except Exception as e: |
| | print(f"β Error processing {file.name}: {str(e)}") |
| | |
| | state.research_papers = papers |
| | print(f"π Extracted content from {len(papers)} files.") |
| | |
| | return console_output.getvalue(), state.model_dump(), len(papers) > 0 |
| | def call_orchestrator(state_dict : Dict, user_query : str): |
| | "Function prototype to call the orchestrator agent" |
| | state = ResearchAgentState.model_validate(state_dict) |
| |
|
| | state.user_query = user_query |
| | buffer = StringIO() |
| | with contextlib.redirect_stdout(buffer): |
| | config = {} |
| | |
| | try: |
| | result = graph.invoke(input=state, config=config) |
| | output_text = buffer.getvalue() |
| | result_dict = type_conversion(result, ResearchAgentState) |
| | return output_text, result_dict, True |
| | except Exception as e: |
| | error_msg = f"An error occurred during processing: {str(e)}" |
| | output_text = buffer.getvalue() + "\n" + error_msg |
| | return output_text, state_dict, False |
| |
|
| | result_dict = type_conversion(result, ResearchAgentState) |
| |
|
| | output_text = buffer.getvalue() |
| |
|
| | return output_text, result_dict, True |
| |
|
| | |
| | |
| |
|
| | |
| | with gr.Blocks() as research_assistant_server: |
| | gr.Markdown("# π Academic Research Assistant") |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | try: |
| | gr.Image(value="research_assistant.png", container=False, show_download_button=False, show_fullscreen_button=False) |
| | except Exception: |
| | gr.Markdown("*Research Assistant Image*") |
| |
|
| | with gr.Column(scale=4): |
| | gr.Markdown("## Your AI partner for literature reviews and research discovery.") |
| | gr.Markdown("Upload one or more research papers, ask a question, and let the assistant synthesize findings and identify future research directions.") |
| |
|
| | state_dict = gr.State(value=ResearchAgentState(available_agents=available_agents).model_dump()) |
| | extraction_successful = gr.State(value=False) |
| | api_key_set = gr.State(value=API_KEY is not None) |
| | |
| | with gr.Tabs(): |
| | with gr.TabItem("π API Key Setup"): |
| | gr.Markdown("### Set up your Nebius API Key") |
| | gr.Markdown("A valid API key is required to use this research assistant. You can either provide it here or set it as an environment variable.") |
| | |
| | with gr.Row(): |
| | nebius_key_input = gr.Textbox( |
| | label="Nebius API Key", |
| | placeholder="Enter your Nebius API key here...", |
| | type="password", |
| | value="" |
| | ) |
| | |
| | |
| | with gr.Row(): |
| | discover_models_button = gr.Button("π Discover Available Models", variant="secondary") |
| | test_model_input = gr.Textbox( |
| | label="Or manually test a model name:", |
| | placeholder="e.g., gpt-3.5-turbo" |
| | ) |
| | |
| | available_models_display = gr.Textbox( |
| | label="Available Models", |
| | lines=5, |
| | interactive=False |
| | ) |
| | |
| | with gr.Row(): |
| | model_dropdown = gr.Dropdown( |
| | choices=NEBIUS_MODELS, |
| | value=MODEL_NAME or NEBIUS_MODELS[0], |
| | label="Select Nebius Model", |
| | allow_custom_value=True |
| | ) |
| | |
| | api_key_status = gr.Markdown("β οΈ **No API key detected.** Please enter your Nebius API key." if API_KEY is None else "β
**API key configured.** You're ready to use the assistant.") |
| | |
| | save_key_button = gr.Button("Save API Key", variant="primary") |
| | |
| | def discover_models(key): |
| | if not key: |
| | return "Please enter an API key first." |
| | |
| | global API_KEY, ENDPOINT_URL |
| | API_KEY = key |
| | ENDPOINT_URL = "https://api.studio.nebius.com/v1/" |
| | |
| | models = list_nebius_models() |
| | if models: |
| | return "Available models:\n" + "\n".join([f"- {model}" for model in models]) |
| | else: |
| | return "Could not fetch models. Please check your API key." |
| | |
| | discover_models_button.click( |
| | fn=discover_models, |
| | inputs=[nebius_key_input], |
| | outputs=[available_models_display] |
| | ) |
| | |
| | def save_api_key(key, model): |
| | success = setup_api_key(key, model) |
| | if success: |
| | return f"β
**API key saved successfully!** Using model: {MODEL_NAME}", True |
| | else: |
| | return "β **Invalid API key.** Please check and try again.", False |
| | |
| | save_key_button.click( |
| | fn=save_api_key, |
| | inputs=[nebius_key_input, model_dropdown], |
| | outputs=[api_key_status, api_key_set] |
| | ) |
| | |
| | with gr.TabItem("π Research Materials"): |
| | gr.Markdown("### π Feed the assistant with the research papers you want to analyze.") |
| |
|
| | with gr.Row(): |
| | research_papers_files = gr.File( |
| | label="Upload Research Paper(s)", |
| | file_count="multiple", |
| | file_types=[".pdf", ".txt", ".docx", ".md"], |
| | height=200 |
| | ) |
| |
|
| | with gr.Accordion("Advanced options", open=False): |
| | max_iterations = gr.Number(label="Number of refinement iterations", value=1, precision=0) |
| |
|
| | extract_button = gr.Button("Process Papers", variant="primary") |
| |
|
| | extract_console_output = gr.Textbox(label="Logs / Console Output") |
| | |
| | |
| | def extract_with_api_check(state_dict, paper_files, max_iterations, api_key_set): |
| | if not api_key_set: |
| | return "β οΈ Please set up your API key in the 'API Key Setup' tab first.", state_dict, False |
| | return extract_research_papers(state_dict, paper_files, max_iterations) |
| |
|
| | extract_button.click( |
| | fn=extract_with_api_check, |
| | inputs=[state_dict, research_papers_files, max_iterations, api_key_set], |
| | outputs=[extract_console_output, state_dict, extraction_successful] |
| | ) |
| |
|
| | |
| | with gr.TabItem("π€ Q&A Chatbot"): |
| | examples = """βΉοΈ **Example Queries** |
| | - Summarize the key findings from these papers. |
| | - After synthesizing these articles, what are the main research gaps? |
| | - Propose three future studies based on the provided research. |
| | """ |
| | gr.Markdown(examples) |
| | user_query = gr.Textbox(label="Ask your research question", value="Identify the main gaps and suggest future work.", interactive=True) |
| | button = gr.Button("Ask the Research Assistant π¬π§ ", variant="primary") |
| | |
| | |
| | qa_output = gr.Markdown( |
| | label="Research Assistant Response", |
| | value="### π Upload papers and ask a question to get started.", |
| | elem_id="qa_output" |
| | ) |
| | |
| | output_logs = gr.Textbox(label="Logs/ Console Output", lines=10) |
| |
|
| | def call_with_api_check(state_dict, user_query, api_key_set): |
| | """Wrapper to check API key before calling orchestrator.""" |
| | if not API_KEY: |
| | error_msg = "β οΈ Please set up your API key in the 'API Key Setup' tab first." |
| | return error_msg, error_msg, state_dict |
| | |
| | if not state_dict.get("research_papers"): |
| | error_msg = "### βοΈ No Research Papers Found\n\nππ½ Please upload research papers in the 'π Research Materials' tab first." |
| | return error_msg, error_msg, state_dict |
| | |
| | try: |
| | logs, updated_state, success = call_orchestrator(state_dict, user_query) |
| | |
| | if success and updated_state.get("final_answer"): |
| | final_answer = updated_state.get("final_answer") |
| | return final_answer, logs, updated_state |
| | else: |
| | error_msg = f"### βοΈ Processing Failed\n\n{logs}\n\nPlease check the logs above for details." |
| | return error_msg, logs, state_dict |
| | |
| | except Exception as e: |
| | error_msg = f"### βοΈ An Error Occurred\n\n```\n{str(e)}\n```\n\nPlease check your API key and try again." |
| | return error_msg, f"Error: {str(e)}", state_dict |
| |
|
| | def reset_output(): |
| | """Reset the output when starting a new query.""" |
| | return "### π€ Processing your request...\n\nPlease wait while the research assistant analyzes your papers and generates a response.", "Generating response..." |
| |
|
| | button.click( |
| | fn=reset_output, |
| | outputs=[qa_output, output_logs] |
| | ).then( |
| | fn=call_with_api_check, |
| | inputs=[state_dict, user_query, api_key_set], |
| | outputs=[qa_output, output_logs, state_dict] |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | research_assistant_server.launch(mcp_server=True) |