| import subprocess |
| from pathlib import Path |
|
|
| try: |
| import httpx |
| _http = httpx.Client(timeout=30, follow_redirects=True) |
| except ImportError: |
| _http = None |
|
|
|
|
| def search(query: str) -> str: |
| """Search the web and return relevant results.""" |
| if _http is None: |
| return ( |
| f"[search stub for '{query}']: " |
| f"install httpx for real search") |
| resp = _http.get( |
| "https://html.duckduckgo.com/html/", |
| params={"q": query}) |
| from html.parser import HTMLParser |
| results = [] |
|
|
| class DDGParser(HTMLParser): |
| in_result = False |
| current = "" |
|
|
| def handle_starttag(self, tag, attrs): |
| attrs = dict(attrs) |
| if tag == "a" and "result__a" in attrs.get( |
| "class", ""): |
| self.in_result = True |
| self.current = attrs.get("href", "") |
|
|
| def handle_data(self, data): |
| if self.in_result: |
| results.append( |
| f"{data.strip()} ({self.current})") |
| self.in_result = False |
|
|
| DDGParser().feed(resp.text) |
| return "\n".join(results[:10]) or "No results found." |
|
|
|
|
| def fetch(url: str) -> str: |
| """Fetch a website URL and return its text content.""" |
| if _http is None: |
| return f"[fetch stub for {url}]: install httpx" |
| resp = _http.get(url) |
| text = resp.text |
| if len(text) > 50000: |
| text = text[:50000] + "\n... (truncated)" |
| return text |
|
|
|
|
| def read_file(path: str) -> str: |
| """Read a file and return its contents.""" |
| return Path(path).read_text() |
|
|
|
|
| def write_file(path: str, content: str) -> str: |
| """Write content to a file.""" |
| Path(path).parent.mkdir(parents=True, exist_ok=True) |
| Path(path).write_text(content) |
| return f"Wrote {len(content)} bytes to {path}" |
|
|
|
|
| def run_command(command: str) -> str: |
| """Run a shell command and return stdout+stderr.""" |
| result = subprocess.run( |
| command, shell=True, |
| capture_output=True, text=True, timeout=60) |
| output = result.stdout |
| if result.stderr: |
| output += f"\nSTDERR: {result.stderr}" |
| if result.returncode != 0: |
| output += f"\nEXIT CODE: {result.returncode}" |
| return output |
|
|
|
|
| def bash(script: str) -> str: |
| """Run a multi-line bash script.""" |
| result = subprocess.run( |
| ["bash", "-c", script], |
| capture_output=True, text=True, timeout=300) |
| output = result.stdout |
| if result.stderr: |
| output += f"\nSTDERR: {result.stderr}" |
| if result.returncode != 0: |
| output += f"\nEXIT CODE: {result.returncode}" |
| return output |
|
|