self-plan / sync /tools.py
lvwerra's picture
lvwerra HF Staff
Include sync.tools module for generated plans
f01f8bd
Raw
History Blame Contribute Delete
2.6 kB
import subprocess
from pathlib import Path
try:
import httpx
_http = httpx.Client(timeout=30, follow_redirects=True)
except ImportError:
_http = None
def search(query: str) -> str:
"""Search the web and return relevant results."""
if _http is None:
return (
f"[search stub for '{query}']: "
f"install httpx for real search")
resp = _http.get(
"https://html.duckduckgo.com/html/",
params={"q": query})
from html.parser import HTMLParser
results = []
class DDGParser(HTMLParser):
in_result = False
current = ""
def handle_starttag(self, tag, attrs):
attrs = dict(attrs)
if tag == "a" and "result__a" in attrs.get(
"class", ""):
self.in_result = True
self.current = attrs.get("href", "")
def handle_data(self, data):
if self.in_result:
results.append(
f"{data.strip()} ({self.current})")
self.in_result = False
DDGParser().feed(resp.text)
return "\n".join(results[:10]) or "No results found."
def fetch(url: str) -> str:
"""Fetch a website URL and return its text content."""
if _http is None:
return f"[fetch stub for {url}]: install httpx"
resp = _http.get(url)
text = resp.text
if len(text) > 50000:
text = text[:50000] + "\n... (truncated)"
return text
def read_file(path: str) -> str:
"""Read a file and return its contents."""
return Path(path).read_text()
def write_file(path: str, content: str) -> str:
"""Write content to a file."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
Path(path).write_text(content)
return f"Wrote {len(content)} bytes to {path}"
def run_command(command: str) -> str:
"""Run a shell command and return stdout+stderr."""
result = subprocess.run(
command, shell=True,
capture_output=True, text=True, timeout=60)
output = result.stdout
if result.stderr:
output += f"\nSTDERR: {result.stderr}"
if result.returncode != 0:
output += f"\nEXIT CODE: {result.returncode}"
return output
def bash(script: str) -> str:
"""Run a multi-line bash script."""
result = subprocess.run(
["bash", "-c", script],
capture_output=True, text=True, timeout=300)
output = result.stdout
if result.stderr:
output += f"\nSTDERR: {result.stderr}"
if result.returncode != 0:
output += f"\nEXIT CODE: {result.returncode}"
return output