Spaces:
Paused
Paused
| import json | |
| import os | |
| import signal | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Union | |
| import requests | |
| from qwen_agent.tools.base import BaseTool, register_tool | |
| from prompt import EXTRACTOR_PROMPT | |
| from openai import OpenAI | |
| import random | |
| from urllib.parse import urlparse, unquote | |
| import time | |
| import tiktoken | |
| VISIT_SERVER_TIMEOUT = int(os.getenv("VISIT_SERVER_TIMEOUT", 200)) | |
| WEBCONTENT_MAXLENGTH = int(os.getenv("WEBCONTENT_MAXLENGTH", 150000)) | |
| JINA_API_KEYS = os.getenv("JINA_KEY", "") | |
| def truncate_to_tokens(text: str, max_tokens: int = 95000) -> str: | |
| encoding = tiktoken.get_encoding("cl100k_base") | |
| tokens = encoding.encode(text) | |
| if len(tokens) <= max_tokens: | |
| return text | |
| truncated_tokens = tokens[:max_tokens] | |
| return encoding.decode(truncated_tokens) | |
| OSS_JSON_FORMAT = """# Response Formats | |
| ## visit_content | |
| {"properties":{"rational":{"type":"string","description":"Locate the **specific sections/data** directly related to the user's goal within the webpage content"},"evidence":{"type":"string","description":"Identify and extract the **most relevant information** from the content, never miss any important information, output the **full original context** of the content as far as possible, it can be more than three paragraphs.","summary":{"type":"string","description":"Organize into a concise paragraph with logical flow, prioritizing clarity and judge the contribution of the information to the goal."}}}}""" | |
| class Visit(BaseTool): | |
| # The `description` tells the agent the functionality of this tool. | |
| name = 'visit' | |
| description = 'Visit webpage(s) and return the summary of the content.' | |
| # The `parameters` tell the agent what input parameters the tool has. | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "url": { | |
| "type": ["string", "array"], | |
| "items": { | |
| "type": "string" | |
| }, | |
| "minItems": 1, | |
| "description": "The URL(s) of the webpage(s) to visit. Can be a single URL or an array of URLs." | |
| }, | |
| "goal": { | |
| "type": "string", | |
| "description": "The goal of the visit for webpage(s)." | |
| } | |
| }, | |
| "required": ["url", "goal"] | |
| } | |
| # The `call` method is the main function of the tool. | |
| def call(self, params: Union[str, dict], **kwargs) -> str: | |
| try: | |
| params = json.loads(params) | |
| url = params["url"] | |
| goal = params["goal"] | |
| except: | |
| return "[Visit] Invalid request format: Input must be a JSON object containing 'url' and 'goal' fields" | |
| start_time = time.time() | |
| # Create log folder if it doesn't exist | |
| log_folder = "log" | |
| os.makedirs(log_folder, exist_ok=True) | |
| if isinstance(url, str): | |
| response = self.readpage_jina(url, goal) | |
| else: | |
| response = [] | |
| assert isinstance(url, List) | |
| start_time = time.time() | |
| for u in url: | |
| if time.time() - start_time > 900: | |
| cur_response = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) | |
| cur_response += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" | |
| cur_response += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" | |
| else: | |
| try: | |
| cur_response = self.readpage_jina(u, goal) | |
| except Exception as e: | |
| cur_response = f"Error fetching {u}: {str(e)}" | |
| response.append(cur_response) | |
| response = "\n=======\n".join(response) | |
| print(f'Summary Length {len(response)}; Summary Content {response}') | |
| return response.strip() | |
| def call_server(self, msgs, max_retries=2): | |
| # Use summary model config, fallback to main model config | |
| api_key = os.environ.get("API_KEY") or os.environ.get("DR_MODEL_API_KEY") | |
| url_llm = os.environ.get("API_BASE") or os.environ.get("DR_MODEL_SERVER") | |
| model_name = os.environ.get("SUMMARY_MODEL_NAME") or os.environ.get("DR_MODEL_NAME", "qwen/qwen3-30b-a3b-instruct-2507") | |
| client = OpenAI( | |
| api_key=api_key, | |
| base_url=url_llm, | |
| ) | |
| for attempt in range(max_retries): | |
| try: | |
| chat_response = client.chat.completions.create( | |
| model=model_name, | |
| messages=msgs, | |
| temperature=0.7 | |
| ) | |
| content = chat_response.choices[0].message.content | |
| if content: | |
| try: | |
| json.loads(content) | |
| except: | |
| # extract json from string | |
| left = content.find('{') | |
| right = content.rfind('}') | |
| if left != -1 and right != -1 and left <= right: | |
| content = content[left:right+1] | |
| return content | |
| except Exception as e: | |
| print(e) | |
| if attempt == (max_retries - 1): | |
| return "" | |
| continue | |
| def jina_readpage(self, url: str) -> str: | |
| """ | |
| Read webpage content using Jina service. | |
| Args: | |
| url: The URL to read | |
| goal: The goal/purpose of reading the page | |
| Returns: | |
| str: The webpage content or error message | |
| """ | |
| max_retries = 3 | |
| timeout = 50 | |
| for attempt in range(max_retries): | |
| headers = { | |
| "Authorization": f"Bearer {JINA_API_KEYS}", | |
| } | |
| try: | |
| response = requests.get( | |
| f"https://r.jina.ai/{url}", | |
| headers=headers, | |
| timeout=timeout | |
| ) | |
| if response.status_code == 200: | |
| webpage_content = response.text | |
| return webpage_content | |
| else: | |
| print(response.text) | |
| raise ValueError("jina readpage error") | |
| except Exception as e: | |
| time.sleep(0.5) | |
| if attempt == max_retries - 1: | |
| return "[visit] Failed to read page." | |
| return "[visit] Failed to read page." | |
| def html_readpage_jina(self, url: str) -> str: | |
| max_attempts = 8 | |
| for attempt in range(max_attempts): | |
| content = self.jina_readpage(url) | |
| service = "jina" | |
| print(service) | |
| if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"): | |
| return content | |
| return "[visit] Failed to read page." | |
| def readpage_jina(self, url: str, goal: str) -> str: | |
| """ | |
| Attempt to read webpage content by alternating between jina and aidata services. | |
| Args: | |
| url: The URL to read | |
| goal: The goal/purpose of reading the page | |
| Returns: | |
| str: The webpage content or error message | |
| """ | |
| summary_page_func = self.call_server | |
| max_retries = int(os.getenv('VISIT_SERVER_MAX_RETRIES', 1)) | |
| content = self.html_readpage_jina(url) | |
| if content and not content.startswith("[visit] Failed to read page.") and content != "[visit] Empty content." and not content.startswith("[document_parser]"): | |
| content = truncate_to_tokens(content, max_tokens=95000) | |
| messages = [{"role": "user", "content": EXTRACTOR_PROMPT.format(webpage_content=content, goal=goal)}] | |
| parse_retry_times = 0 | |
| raw = summary_page_func(messages, max_retries=max_retries) | |
| summary_retries = 3 | |
| while len(raw) < 10 and summary_retries >= 0: | |
| truncate_length = int(0.7 * len(content)) if summary_retries > 0 else 25000 | |
| status_msg = ( | |
| f"[visit] Summary url[{url}] " | |
| f"attempt {3 - summary_retries + 1}/3, " | |
| f"content length: {len(content)}, " | |
| f"truncating to {truncate_length} chars" | |
| ) if summary_retries > 0 else ( | |
| f"[visit] Summary url[{url}] failed after 3 attempts, " | |
| f"final truncation to 25000 chars" | |
| ) | |
| print(status_msg) | |
| content = content[:truncate_length] | |
| extraction_prompt = EXTRACTOR_PROMPT.format( | |
| webpage_content=content, | |
| goal=goal | |
| ) | |
| messages = [{"role": "user", "content": extraction_prompt}] | |
| raw = summary_page_func(messages, max_retries=max_retries) | |
| summary_retries -= 1 | |
| parse_retry_times = 2 | |
| if isinstance(raw, str): | |
| raw = raw.replace("```json", "").replace("```", "").strip() | |
| while parse_retry_times < 3: | |
| try: | |
| raw = json.loads(raw) | |
| break | |
| except: | |
| raw = summary_page_func(messages, max_retries=max_retries) | |
| parse_retry_times += 1 | |
| if parse_retry_times >= 3: | |
| useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) | |
| useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" | |
| useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" | |
| else: | |
| useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) | |
| useful_information += "Evidence in page: \n" + str(raw["evidence"]) + "\n\n" | |
| useful_information += "Summary: \n" + str(raw["summary"]) + "\n\n" | |
| if len(useful_information) < 10 and summary_retries < 0: | |
| print("[visit] Could not generate valid summary after maximum retries") | |
| useful_information = "[visit] Failed to read page" | |
| return useful_information | |
| # If no valid content was obtained after all retries | |
| else: | |
| useful_information = "The useful information in {url} for user goal {goal} as follows: \n\n".format(url=url, goal=goal) | |
| useful_information += "Evidence in page: \n" + "The provided webpage content could not be accessed. Please check the URL or file format." + "\n\n" | |
| useful_information += "Summary: \n" + "The webpage content could not be processed, and therefore, no information is available." + "\n\n" | |
| return useful_information | |
| if __name__ == "__main__": | |
| a = Visit() | |
| print(a.call('{"url": ["https://2025.aclweb.org/"], "goal": "Find the important dates page and locate the Industry Track paper submission deadline"}')) | |