Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| import requests | |
| import json | |
| import asyncio | |
| from typing import List, Dict, Any, Generator | |
| import logging | |
| from duckduckgo_search import DDGS | |
| from bs4 import BeautifulSoup | |
| import re | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration from environment variables with defaults | |
| DEFAULT_IP = {public_ip} | |
| DEFAULT_PORT = {port} | |
| DEFAULT_KEY = {api_key} | |
| DEFAULT_MODEL = {model} | |
| llm_ip = os.environ.get('LLM_IP', DEFAULT_IP) | |
| llm_port = os.environ.get('LLM_PORT', DEFAULT_PORT) | |
| llm_key = os.environ.get('LLM_KEY', DEFAULT_KEY) | |
| llm_model = os.environ.get('LLM_MODEL', DEFAULT_MODEL) | |
| class WebTools: | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| self.ddgs = DDGS() | |
| def search_web(self, query: str, max_results: int = 5) -> str: | |
| """Search the web using DuckDuckGo""" | |
| try: | |
| results = self.ddgs.text(query, max_results=max_results) | |
| if not results: | |
| return f"No search results found for: {query}" | |
| formatted_results = f"Search results for '{query}':\n\n" | |
| for i, result in enumerate(results, 1): | |
| title = result.get('title', 'No title') | |
| body = result.get('body', 'No description') | |
| href = result.get('href', 'No URL') | |
| formatted_results += f"{i}. **{title}**\n{body}\nURL: {href}\n\n" | |
| return formatted_results | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| return f"Search error: {str(e)}" | |
| def visit_website(self, url: str) -> str: | |
| """Visit a website and extract its text content""" | |
| try: | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| response = self.session.get(url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "nav", "footer", "header"]): | |
| script.decompose() | |
| # Get text content | |
| text = soup.get_text() | |
| # Clean up text | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| # Limit text length | |
| if len(text) > 3000: | |
| text = text[:3000] + "... (content truncated)" | |
| return f"Content from {url}:\n\n{text}" | |
| except Exception as e: | |
| logger.error(f"Website visit error: {e}") | |
| return f"Error visiting {url}: {str(e)}" | |
| class LLMClient: | |
| def __init__(self, ip: str, port: str, api_key: str, model: str): | |
| self.ip = ip | |
| self.port = port | |
| self.api_key = api_key | |
| self.model = model | |
| self.base_url = f"http://{ip}:{port}/v1/chat/completions" | |
| def call_llm(self, messages: List[Dict], max_tokens: int = 512, stream: bool = False): | |
| """Call the LLM API""" | |
| headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {self.api_key}" | |
| } | |
| data = { | |
| "model": self.model, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "stream": stream | |
| } | |
| try: | |
| response = requests.post(self.base_url, headers=headers, json=data, | |
| stream=stream, timeout=30) | |
| response.raise_for_status() | |
| if stream: | |
| return response | |
| else: | |
| result = response.json() | |
| return result["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| logger.error(f"LLM API call failed: {e}") | |
| return f"Error: {str(e)}" | |
| class ReactAgent: | |
| def __init__(self, llm_client: LLMClient): | |
| self.llm_client = llm_client | |
| self.web_tools = WebTools() | |
| self.system_prompt = """You are a helpful AI assistant with access to web browsing capabilities. You can: | |
| 1. Search the web using DuckDuckGo | |
| 2. Visit and analyze websites | |
| 3. Answer questions based on current information | |
| When a user asks something that requires current information or web searching, use the available tools. | |
| Available tools: | |
| - search_web(query): Search DuckDuckGo for information | |
| - visit_website(url): Visit and extract content from a website | |
| Format your tool calls as: TOOL[tool_name: parameters] | |
| For example: TOOL[search_web: latest news about AI] or TOOL[visit_website: https://example.com] | |
| Always explain what you're doing and provide helpful responses based on the information you gather.""" | |
| def parse_tool_calls(self, text: str) -> List[Dict]: | |
| """Parse tool calls from agent response""" | |
| tool_pattern = r'TOOL\[(\w+):\s*([^\]]+)\]' | |
| matches = re.findall(tool_pattern, text) | |
| tools = [] | |
| for tool_name, params in matches: | |
| tools.append({ | |
| 'name': tool_name, | |
| 'params': params.strip() | |
| }) | |
| return tools | |
| def execute_tool(self, tool_name: str, params: str) -> str: | |
| """Execute a tool and return results""" | |
| try: | |
| if tool_name == 'search_web': | |
| return self.web_tools.search_web(params) | |
| elif tool_name == 'visit_website': | |
| return self.web_tools.visit_website(params) | |
| else: | |
| return f"Unknown tool: {tool_name}" | |
| except Exception as e: | |
| return f"Tool execution error: {str(e)}" | |
| def process_message(self, message: str, history: List[List[str]], max_tokens: int) -> Generator[str, None, None]: | |
| """Process user message with ReAct pattern""" | |
| try: | |
| # Format chat history | |
| messages = [{"role": "system", "content": self.system_prompt}] | |
| for user_msg, assistant_msg in history: | |
| messages.append({"role": "user", "content": user_msg}) | |
| if assistant_msg: | |
| messages.append({"role": "assistant", "content": assistant_msg}) | |
| messages.append({"role": "user", "content": message}) | |
| # Initial LLM call | |
| response = self.llm_client.call_llm(messages, max_tokens, stream=True) | |
| current_response = "" | |
| tool_calls_made = False | |
| # Stream initial response | |
| for line in response.iter_lines(): | |
| if line: | |
| line = line.decode('utf-8') | |
| if line.startswith('data: '): | |
| data_str = line[6:] | |
| if data_str.strip() == '[DONE]': | |
| break | |
| try: | |
| data = json.loads(data_str) | |
| if 'choices' in data and len(data['choices']) > 0: | |
| delta = data['choices'][0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| current_response += content | |
| yield current_response | |
| except json.JSONDecodeError: | |
| continue | |
| # Check for tool calls | |
| tool_calls = self.parse_tool_calls(current_response) | |
| if tool_calls: | |
| tool_calls_made = True | |
| for tool_call in tool_calls: | |
| yield current_response + f"\n\nπ Executing {tool_call['name']}..." | |
| tool_result = self.execute_tool(tool_call['name'], tool_call['params']) | |
| # Add tool result to conversation | |
| messages.append({"role": "assistant", "content": current_response}) | |
| messages.append({"role": "user", "content": f"Tool result:\n{tool_result}\n\nPlease provide a helpful response based on this information."}) | |
| # Get final response | |
| final_response = self.llm_client.call_llm(messages, max_tokens, stream=True) | |
| final_text = current_response + f"\n\n**Tool Results:**\n{tool_result}\n\n**Response:**\n" | |
| for line in final_response.iter_lines(): | |
| if line: | |
| line = line.decode('utf-8') | |
| if line.startswith('data: '): | |
| data_str = line[6:] | |
| if data_str.strip() == '[DONE]': | |
| break | |
| try: | |
| data = json.loads(data_str) | |
| if 'choices' in data and len(data['choices']) > 0: | |
| delta = data['choices'][0].get('delta', {}) | |
| content = delta.get('content', '') | |
| if content: | |
| final_text += content | |
| yield final_text | |
| except json.JSONDecodeError: | |
| continue | |
| break # Only handle first tool call for now | |
| except Exception as e: | |
| error_msg = f"Agent error: {str(e)}" | |
| logger.error(error_msg) | |
| yield error_msg | |
| # Initialize components | |
| llm_client = LLMClient(llm_ip, llm_port, llm_key, llm_model) | |
| agent = ReactAgent(llm_client) | |
| def generate_response(message: str, history: List[List[str]], system_prompt: str, | |
| max_tokens: int, ip: str, port: str, api_key: str, model: str): | |
| """Generate streaming response using the agent""" | |
| global llm_client, agent | |
| # Update LLM client if parameters changed | |
| if (ip != llm_client.ip or port != llm_client.port or | |
| api_key != llm_client.api_key or model != llm_client.model): | |
| llm_client = LLMClient(ip, port, api_key, model) | |
| agent = ReactAgent(llm_client) | |
| # Update system prompt if provided | |
| if system_prompt.strip(): | |
| agent.system_prompt = system_prompt | |
| # Generate response | |
| for response in agent.process_message(message, history, max_tokens): | |
| yield response | |
| # Create Gradio interface | |
| chatbot = gr.ChatInterface( | |
| generate_response, | |
| chatbot=gr.Chatbot( | |
| avatar_images=[ | |
| None, | |
| "https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" | |
| ], | |
| height="64vh" | |
| ), | |
| additional_inputs=[ | |
| gr.Textbox( | |
| "You are a helpful AI assistant with web browsing capabilities. You can search the web and visit websites to provide current information. Use TOOL[search_web: query] to search or TOOL[visit_website: url] to browse websites.", | |
| label="System Prompt", | |
| lines=3 | |
| ), | |
| gr.Slider(50, 2048, label="Max Tokens", value=512, | |
| info="Maximum number of tokens in the response"), | |
| gr.Textbox(llm_ip, label="LLM IP Address", | |
| info="IP address of the LLM server"), | |
| gr.Textbox(llm_port, label="LLM Port", | |
| info="Port of the LLM server"), | |
| gr.Textbox(llm_key, label="API Key", type="password", | |
| info="API key for the LLM server"), | |
| gr.Textbox(llm_model, label="Model Name", | |
| info="Name of the model to use"), | |
| ], | |
| title="π€ AI Agent with Web Browsing", | |
| description="Chat with an AI agent that can search the web and browse websites using DuckDuckGo. Use natural language to ask for current information!", | |
| theme="finlaymacklon/smooth_slate", | |
| submit_btn="Send", | |
| retry_btn="π Regenerate Response", | |
| undo_btn="β© Delete Previous", | |
| clear_btn="ποΈ Clear Chat" | |
| ) | |
| if __name__ == "__main__": | |
| chatbot.queue().launch() |