import gradio as gr import constants as cte from rag_smolagent import SmolAgent, MessageRole from googlesearch import search from typing import List, Tuple, Optional, Any class ChatState: def __init__(self) -> None: self.sources: str = "There are no sources yet." self.extra_info: str = "" self.age: str = "" self.residence: str = "" self.response_type: str = "Concise" # Default response type self.seen_messages: List[Any] = [] self.cot_steps: str = "" state = ChatState() # Initialize the SmolAgent with the specified data path agent = SmolAgent() def get_first_result(query: str) -> Optional[str]: """ Searches Google using a processed version of the query string and returns the URL of the first result. """ parts = query.split(",") if len(parts) > 3: msg = parts[-3] + parts[-2] + " manual de la renta 2023" # search only in 2023's Manual else: msg = parts[-2] + " manual de la renta 2023" try: # Perform the search (updated configuration) results = search( msg, num_results=1, # Desired number of results lang="es", # Search language (optional) ) return next(results) if results else None except Exception as e: print(f"Error: {e}") return None def process_sources(sources: str) -> str: """ Processes a string with multiple sources and generates Markdown links using the first search result for each source. """ lines: List[str] = sources.strip().split('\n') results: List[str] = [] for line in lines: clean_line = line.strip() if not clean_line: continue url = get_first_result(clean_line) url_str = url + "\n" if url else "Sin resultados" results.append(f"[{clean_line}]({url_str})") return '\n'.join(results) def update_age(value: str) -> None: """Updates the age in the state and refreshes the extra info.""" state.age = value if value else "" update_extra_info() def update_residence(value: str) -> None: """Updates the residence in the state and refreshes the extra info.""" state.residence = value if value else "" update_extra_info() def update_extra_info() -> None: """Updates the extra info by combining age and residence.""" if state.age and state.residence: state.extra_info = f"Tengo {state.age} años y resido en {state.residence}." else: state.extra_info = "" def update_response_type(value: str) -> None: """Updates the response type in the state.""" state.response_type = value def get_prompt(query: str, extra_info: str, response_type: str) -> str: """Injects the user's query, extra info, and response type into the prompt template.""" return cte.PROMPT_TEMPLATE.format(query=query, extra_info=extra_info, response_type=response_type) def chatbot_response(message: str, history: List[Tuple[str, str]]) -> str: """Calls the agent, separates the main response, sources, and extracts CoT steps.""" response: str = agent(get_prompt(message, state.extra_info, state.response_type)) # Extract sources if present if "Fuentes:" in response: state.sources = response.split("Fuentes:")[1].strip() response = response.split("Fuentes:")[0].strip() else: state.sources = "No sources were used for the generation of this message" # Extract the main response if "Respuesta:" in response: answer: str = response.split("Respuesta:")[1].strip() else: answer = response # Extract Chain of Thought (CoT) steps cot_messages: List[Any] = agent.agent.write_memory_to_messages() state.cot_steps = "" cot_step_counter: int = 0 for message in cot_messages: if (message.get('role') == MessageRole.ASSISTANT) & (message not in state.seen_messages): for content in message.get('content', []): if content.get('type') == 'text' and 'Thought:' in content.get('text', ''): state.cot_steps += f"\n============================== Step {cot_step_counter} ==============================\n" cot_step_counter += 1 state.cot_steps += (content['text'] + "\n") state.seen_messages.append(message) return answer.strip() def respond(message, chat_history): """Handles user input, generates a response, and updates the chat history.""" chat_history.append((message, "")) yield "", chat_history chat_history[-1] = (message, "⏳ Procesando respuesta...") yield "", chat_history bot_message: str = chatbot_response(message, chat_history) # Append sources and Chain of Thought as accordions if available if state.sources != "No sources were used for the generation of this message": try: all_url = process_sources(state.sources) except Exception: all_url = state.sources accordion_sources: str = ( "\n\n
\n" " Sources\n\n" f" {all_url}\n\n" "
" ) bot_message += accordion_sources if state.cot_steps: accordion_cot: str = ( "\n\n
\n" " Chain of Thought\n\n" f" {state.cot_steps}\n\n" "
" ) bot_message += accordion_cot chat_history[-1] = (message, bot_message) yield "", chat_history def clear_history(chat_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: """Clears the chat history and the agent's internal memory.""" chat_history.clear() global agent agent = SmolAgent() state.sources = "There are no sources yet." state.seen_messages.clear() return chat_history # Gradio UI setup custom_theme = gr.Theme.load("/teamspace/studios/this_studio/AgenticRAG/theme.json") with gr.Blocks( fill_height=True, fill_width=True, theme=custom_theme ) as demo: ## Demo Sidebar with gr.Sidebar(): gr.Markdown("## User Information") age_input = gr.Textbox(label="Age", placeholder="Unknown", value=state.age) residence_input = gr.Textbox(label="Residence", placeholder="Unknown", value=state.residence) # Update values when fields change age_input.change(update_age, inputs=age_input) residence_input.change(update_residence, inputs=residence_input) gr.Markdown("## Answer customization") # Dropdown for response type response_type_dropdown = gr.Dropdown( label="Response Type", choices=["Concise", "Detailed"], value="Concise" ) response_type_dropdown.change(update_response_type, inputs=response_type_dropdown) # Clear History button clear_button = gr.Button("Clear History", variant="secondary") web_link = gr.HTML("Abrir página web") gr.Markdown("# RAG ChatBot Agent") with gr.Column(scale=1): chatbot = gr.Chatbot( label="Chat", bubble_full_width=True, scale=1, avatar_images=(None, "https://logosandtypes.com/wp-content/uploads/2022/03/cognizant.svg"), group_consecutive_messages=False ) msg = gr.Textbox( show_label=False, placeholder="What do you want to know?", submit_btn=True ) # Submit and clear history functionality msg.submit(respond, [msg, chatbot], [msg, chatbot]) clear_button.click(clear_history, inputs=chatbot, outputs=chatbot) demo.launch(share=True)