Spaces:
Sleeping
Sleeping
| from agent import manager_agent | |
| import gradio as gr | |
| import smolagents | |
| import json | |
| import re | |
| import ast | |
| import requests | |
| from PIL import Image | |
| # Safe Guard for the agent system message (does not work 100%) | |
| safe_agent = manager_agent | |
| SafeGuard = "You ONLY answer about question related to Clinical studies, if the topic asked differs, you MUST answer politely that your purpose is solely to answer question about clinical trials\n\n" | |
| safe_agent.prompt_templates['system_prompt'] = SafeGuard + manager_agent.prompt_templates['system_prompt'] | |
| # Guard function to insure clinical trials topic (allows bypassing a call to the agent) | |
| def is_clinical_question(query: str) -> bool: | |
| keywords = [ | |
| "clinical", "trial", "efficacy", "phase I", "phase II", "phase III", "study", "studies", | |
| "protocol", "adverse event", "safety", "randomized", "placebo", | |
| "biomarker", "endpoint", "inclusion", "exclusion" | |
| ] | |
| q = query.lower() | |
| return any(kw in q for kw in keywords) | |
| # Use logging to debug | |
| import logging | |
| from datetime import datetime | |
| now = datetime.utcnow().isoformat() | |
| logging.info(f"Processing request {now}") | |
| # Use langfuse to log traces | |
| from langfuse import propagate_attributes | |
| # --- PATCH --- In order to be able to stream Agent intenal steps to Gradio interface | |
| # --- OpenTelemetry detach bug (generator-safe) --- | |
| from opentelemetry.context import _RUNTIME_CONTEXT | |
| _orig_detach = _RUNTIME_CONTEXT.detach | |
| def _safe_detach(token): | |
| try: | |
| _orig_detach(token) | |
| except Exception: | |
| pass | |
| _RUNTIME_CONTEXT.detach = _safe_detach | |
| # --- OpenInference NonRecordingSpan bug --- | |
| try: | |
| from openinference.instrumentation.smolagents import _wrappers | |
| _orig_finalize = _wrappers._finalize_step_span | |
| def _safe_finalize_step_span(span, step_log): | |
| # Check if span has status attribute before accessing it | |
| if hasattr(span, 'status') and hasattr(span.status, 'status_code'): | |
| return _orig_finalize(span, step_log) | |
| # For NonRecordingSpan, just skip finalization | |
| return None | |
| _wrappers._finalize_step_span = _safe_finalize_step_span | |
| except ImportError: | |
| pass # OpenInference not installed | |
| # --- END PATCH --- | |
| # Define Agent, | |
| def Agent(question, history): | |
| """Use a smolagent CodeAgent with tools to answer a question. | |
| The agent streams its thought process (planning steps) and the final answer. | |
| Args: | |
| question (str): The question to be answered by the agent. | |
| Yields: | |
| tuple(str, str, str): A tuple containing the current 'thoughts' (planning/intermediate steps) | |
| ,the current 'final_answer', the history | |
| """ | |
| if not is_clinical_question(question): | |
| err_msg = 'A Clinical trial keyword mas missing (e.g. "clinical", "study", "studies", "trial", "efficacy", "phase I", "phase II", "phase III",\ | |
| "protocol", "adverse event", "safety", "randomized", "placebo","biomarker", "endpoint", "inclusion", "exclusion"' | |
| history.append({"question": question, "answer": err_msg}) | |
| yield "I can only answer questions related to clinical studies.",err_msg,history | |
| return | |
| thoughts = "" | |
| final_answer = "" | |
| try: | |
| logging.info(f"Received question: {question}") | |
| now = datetime.utcnow().isoformat() | |
| # append history to the next question | |
| question_with_history = "Conversation history:\n" + str(history) + "\n\nNew user question:\n " + question | |
| # Propagate session_id to all child observations | |
| with propagate_attributes(tags=["Production","Code Agent","HuggingSpace"], | |
| user_id="Add logic for troubleshootinh", | |
| session_id=f"Add logic to bill user according to usage"): | |
| for st in safe_agent.run(question_with_history,stream=True,return_full_result=True): | |
| if isinstance(st, smolagents.memory.PlanningStep): | |
| plan = 20*"# " + "\n# Planning of manager agent" + st.plan.split("## 2. Plan")[-1] | |
| for m in plan.split("\n"): | |
| thoughts += "\n" + m | |
| yield thoughts, final_answer, history | |
| elif isinstance(st, smolagents.memory.ToolCall): | |
| code = 20*"-" + f"\n{st.name}\n\n" + st.dict()['function']['arguments']+ "\n"+ 20*"-" | |
| for m in code.split("\n"): | |
| thoughts += "\n" + m | |
| yield thoughts, final_answer, history | |
| elif isinstance(st, smolagents.agents.ActionOutput): | |
| if not st.output: | |
| thoughts += "\n\n\n****************\nNo output from action.\n****************\n\n" | |
| yield thoughts, final_answer, history | |
| else: | |
| thoughts += "\n***********\nNow processing the output of the tool\n***********\n\n" | |
| yield thoughts, final_answer, history | |
| elif isinstance(st, smolagents.memory.ActionStep): | |
| for chatmessage in st.model_input_messages: | |
| if chatmessage.role == "assistant": | |
| managed_agent_plan = chatmessage.content[0]['text'].split("2. Plan")[-1] | |
| thoughts += "Managed agent plan:\n" | |
| for l in managed_agent_plan.split("\n"): | |
| thoughts += l | |
| thoughts += "\n\n--> Code action from managed agent \n" + st.code_action +"\n\n" | |
| yield thoughts, final_answer, history | |
| thoughts += "\n********** End fo Step " + str(st.step_number) + " : *********\n" + str(st.token_usage) + "\nStep duration" + str(st.timing) + "\n\n" | |
| yield thoughts, final_answer, history | |
| elif isinstance(st, smolagents.memory.FinalAnswerStep): | |
| final_answer = st.output | |
| history.append({"question": question, "answer": final_answer}) | |
| yield thoughts, final_answer, history | |
| except GeneratorExit: | |
| print("Stream closed cleanly.") | |
| return "","", "" | |
| except gr.CancelledError: | |
| print("Request cancelled") | |
| return "Request cancelled","Submit new request", "" | |
| # Set up clinical trial MCP tool with structured TOON output | |
| def tool_clinical_trial(query_cond:str=None, query_term:str=None,query_lead:str=None,max_results: str="5") -> str: | |
| """ | |
| Search Clinical Trials database for trials with 4 arguments. | |
| Args: | |
| query_cond (str): Disease or condition (e.g., 'lung cancer', 'diabetes') | |
| query_term (str): Other terms such as exact ID "NCTxxxxxxxx" or (e.g., 'AREA[LastUpdatePostDate]RANGE[2023-01-15,MAX]'). | |
| query_lead (str): Searches the LeadSponsorName | |
| max_results (int): Number of trials to return (max: 1000) | |
| Returns: | |
| list(str): each string being a structured representation of a trial. | |
| """ | |
| from tool_TOON_formater import TOON_formater | |
| try: | |
| max_results = int(max_results) | |
| except: | |
| max_results = 5 | |
| params = { | |
| "query.cond": query_cond, | |
| "query.term":query_term, | |
| "query.lead":query_lead, | |
| "pageSize": min(max_results, 5000), | |
| "format": "json" | |
| } | |
| params = {k: v for k, v in params.items() if v is not None} | |
| try: | |
| response = requests.get( | |
| "https://clinicaltrials.gov/api/v2/studies", | |
| params=params, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| studies = response.json().get("studies", []) | |
| structured_trials = [] | |
| stream = "" | |
| for i, study in enumerate(studies): | |
| structured_data = TOON_formater(study) | |
| structured_trials.append(structured_data) | |
| for l in structured_data.split("\n"): | |
| stream += "\n\n"+ 10*"--" + "\n" + structured_data | |
| yield stream | |
| return structured_trials | |
| except Exception as e: | |
| return [f"Error searching clinical trials: {str(e)}"] | |
| # Set up RAG | |
| def create_rag(refs :str, VECTOR_DB_PATH:str)-> str: | |
| """Create a RAG (Retrieval-Augmented Generation) vector store from a list of DOIs. | |
| Args: | |
| refs (str): A comma-separated string of DOIs (Digital Object Identifiers). | |
| VECTOR_DB_PATH (str): The local path where the FAISS vector store should be saved. | |
| Returns: | |
| str: The path to the newly created FAISS vector store. | |
| """ | |
| from tool_create_FAISS_vector import create_vector_store_from_list_of_doi | |
| FAISS_VECTOR_PATH = create_vector_store_from_list_of_doi(refs,VECTOR_DB_PATH) | |
| return FAISS_VECTOR_PATH | |
| # Use RAG | |
| def use_rag(query: str, store_name: str, top_k: int = 5) -> str: | |
| """Retrieve context from a FAISS vector store based on a query. | |
| Args: | |
| query (str): The question or query string to use for retrieval. | |
| store_name (str): The path to the FAISS vector store to query. | |
| top_k (int): The number of top-k most relevant context documents to retrieve (default: 5). | |
| Returns: | |
| str: A TOON formated string containing the retrieved contexts, including the contents, the source and the scores. | |
| """ | |
| from tool_query_FAISS_vector import query_vector_store | |
| context_as_dict = query_vector_store(query, store_name, top_k) | |
| return str(context_as_dict) | |
| # Describe a figure with Gemini | |
| def describe_figure(image : Image.Image) -> str: | |
| """Provide a detailed, thorough description of an image figure. | |
| Args: | |
| image (Image.Image): The image to describe. | |
| Returns: | |
| description (str): A detailed textual description of the figure's content. | |
| """ | |
| from tool_describe_figure import thorough_picture_description | |
| description = thorough_picture_description(image) | |
| return description | |
| # Create neat interface - Question Analyzer as a Blocks component | |
| with gr.Blocks() as interface1: | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("""## Gradio Application | |
| The server exposes a multi-tab Gradio interface for interactive operations: | |
| 1. Question Analyzer | |
| Streams LLM reasoning steps, tool invocations, and intermediate thoughts | |
| Displays final, aggregated answer from the manager agent | |
| 2. Create RAG Vector Store | |
| Upload list of DOIs/PMIDs | |
| Outputs path to stored FAISS index | |
| 3. Query RAG Vector Store | |
| Retrieve contextual scientific documents for any question | |
| 4. Clinical Trial Query Interface | |
| Calls the ClinicalTrials.gov API | |
| Returns TOON-formatted study objects | |
| 5. Figure Description | |
| Accepts image uploads | |
| Produces detailed biomedical-quality captions""") | |
| with gr.Column(): | |
| gr.Markdown("""## Summary | |
| This MCP server is a full-stack multi-agent research system with: | |
| Hierarchical LLM planning | |
| Dedicated scientific and clinical tools | |
| Real-time execution monitoring | |
| FAISS-based custom RAG infrastructure | |
| Integrated web search and document extraction | |
| A complete interactive UI for researchers or clinicians | |
| It is suitable for: | |
| Clinical evidence synthesis | |
| Scientific research workflows | |
| Medical question answering | |
| Literature reviews | |
| Automated extraction pipelines""") | |
| with gr.Blocks() as interface2: | |
| gr.Markdown("# Question Analyzer") | |
| with gr.Row(): | |
| with gr.Column(): | |
| question_input = gr.Textbox( | |
| label="Question", | |
| placeholder="Enter your question here...", | |
| lines=3, | |
| ) | |
| gr.Examples(["What is the weather in LA?", | |
| "What are the 5 most recent clinical study sponsored by Merck?", | |
| "How many trials were completed in 2025 by AbbVie?", | |
| "What are the pmids associated with the study NCT04516746?",],question_input) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| stop_btn = gr.Button("Stop", variant="secondary") | |
| response_output = gr.Textbox( | |
| label="Final Answer", | |
| placeholder="Copy/paste this output to the next tab 'Create RAG tool with FAISS vector store", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| with gr.Column(): | |
| thoughts_output = gr.Textbox( | |
| label="LLM Thoughts/Reasoning", | |
| placeholder="The Agent reasonning will be outputed here, use it to track its validity", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| chat_history = gr.State([]) # Necessary to save conversation history | |
| submit_evt = submit_btn.click( | |
| fn=Agent, | |
| inputs=[question_input, chat_history], | |
| outputs=[thoughts_output, response_output, chat_history], | |
| queue=True | |
| ) | |
| stop_btn.click(fn=None, cancels=[submit_evt]) | |
| with gr.Blocks() as interface3: | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown('### Create a vector store. You can specify an existing vector store to update it') | |
| ref_input = gr.Textbox( | |
| label="List of references to include in vector store", | |
| lines=2, | |
| info="(can be DOIs, PMIDs, arXivs, ... and a mix of it)" | |
| ) | |
| vector_name_input = gr.Textbox( | |
| label="Name of the vector store", | |
| lines=2, | |
| placeholder="My_Diabetes_vector" | |
| ) | |
| with gr.Column(): | |
| gr.Examples( | |
| examples=[ | |
| ["I found the folowing PMID: 40445502, 40050939, 29884191", "Ex1_vector_store"], | |
| ["2501.16868, 29641991, PMC5034499, 10.1007","Ex2_vector_store" ] | |
| ], | |
| inputs=[ref_input, vector_name_input] | |
| ) | |
| path_output = gr.Textbox( | |
| label="Path of the vector store", | |
| lines=4 | |
| ) | |
| submit_btn = gr.Button("Create Vector Store") | |
| submit_btn.click( | |
| fn=create_rag, | |
| inputs=[ref_input, vector_name_input], | |
| outputs=path_output, | |
| queue=True | |
| ) | |
| # Combined interfaces with tabs | |
| demo = gr.TabbedInterface( | |
| [interface1, | |
| interface2, | |
| interface3, | |
| gr.Interface( | |
| fn=use_rag, | |
| inputs=[gr.Textbox(label="Question that needs context to answer", placeholder="What is the dose of medicine to gove an infant under type2 diabetes"), | |
| gr.Textbox(label="Name of the vector store to use", placeholder="Diabetes, Sickel_cell_anemia, Prostate_cancer, ..")], | |
| outputs=gr.Textbox(label="Answer with Rag",lines=8, placeholder="Your answer will be provided here"), | |
| api_name="use_vector_store_to_create_context"), | |
| gr.Interface( | |
| fn=tool_clinical_trial, | |
| inputs=[gr.Textbox(label="Disease or condition (e.g., 'lung cancer', 'diabetes')",placeholder="Diabetes"), | |
| gr.Textbox(label="Other terms (e.g., 'AREA[LastUpdatePostDate]RANGE[2023-01-15,MAX]'", placeholder=""), | |
| gr.Textbox(label="Searches the LeadSponsorName",placeholder="Lilly OR Sanofi"), | |
| gr.Textbox(label="Max results to retreive",placeholder=50)], | |
| outputs=gr.Textbox(label="TOON formated response",lines=10, placeholder="Your answer will be provided here"), | |
| api_name="use_clinical_trial_to_create_context"), | |
| gr.Interface( | |
| describe_figure, | |
| gr.Image(type="pil"), | |
| gr.Textbox(), | |
| api_name="figure_description"), | |
| ], | |
| ["Description", | |
| "Use a code agent with sandbox execution equiped with clinical trial tool", | |
| "Create RAG tool with FAISS vector store", | |
| "Query RAG tool", | |
| "Query clinical trial database", | |
| "Thourough figure description",] | |
| ) | |
| # Allows MCP | |
| if __name__ == "__main__": | |
| demo.queue().launch(mcp_server=True) |