| import asyncio |
| import os |
| import mimetypes |
| import httpx |
| from llama_index.core.llms import ChatMessage, TextBlock, ImageBlock |
| from llama_index.llms.nebius import NebiusLLM |
| from ddgs import DDGS |
| from ddgs.exceptions import DDGSException |
| import bs4 |
| from readability import Document |
|
|
| def multiply(a: float, b: float) -> float: |
| """Multiply two numbers and returns the product""" |
| return a * b |
|
|
|
|
| def add(a: float, b: float) -> float: |
| """Add two numbers and returns the sum""" |
| return a + b |
|
|
|
|
| def webSearchTool( |
| query: str, |
| region: str = "us-en", |
| timelimit: str | None = None |
| ) -> list[dict[str, str]]: |
| """ |
| Perform a web search using DuckDuckGo metasearch across multiple backends. |
| |
| This tool searches the web using DuckDuckGo's metasearch engine, which can query |
| multiple search backends including Bing, Brave, DuckDuckGo, Google, Mojeek, |
| Yandex, Yahoo, and Wikipedia. Returns a list of search results with titles, |
| snippets, and URLs. |
| |
| Args: |
| query: The search query text. Supports advanced search operators: |
| - "exact phrase" - Search for exact phrase |
| - term1 -term2 - Exclude term2 from results |
| - term1 +term2 - Emphasize term2 in results |
| - term filetype:pdf - Search for specific file types (pdf, doc, docx, xls, xlsx, ppt, pptx, html) |
| - term site:example.com - Search within a specific site |
| - term -site:example.com - Exclude a specific site |
| - intitle:term - Search in page titles |
| - inurl:term - Search in page URLs |
| |
| region: Search region/locale. Examples: "us-en", "uk-en", "ru-ru", etc. Defaults to "us-en". |
| |
| timelimit: Limit results to a specific time period. Options: "d" (day), "w" (week), |
| "m" (month), "y" (year). Defaults to None (no time limit). |
| |
| Returns: |
| A list of dictionaries, where each dictionary contains search result information |
| with keys such as 'title', 'body', 'href', etc. |
| |
| Example: |
| >>> results = webSearchTool("Python programming") |
| >>> results = webSearchTool("machine learning filetype:pdf") |
| >>> results = webSearchTool("news site:example.com") |
| """ |
| try: |
| return list(DDGS().text( |
| query=query, |
| region=region, |
| timelimit=timelimit |
| )) |
| except DDGSException: |
| |
| return [] |
|
|
|
|
| async def directFetchTool(url: str, offset: int = 0) -> str: |
| """ |
| Fetch and extract only the meaningful readable content from a webpage, |
| similar to Chrome/Firefox Reader Mode. Removes navigation, ads, sidebars, |
| comments, and other non-essential content, keeping only the main article text. |
| |
| Args: |
| url: The URL of the webpage to fetch. Must be a valid HTTP or HTTPS URL. |
| offset: position from start of the web page content. Default = 0 |
| Returns: |
| The extracted meaningful text content of the webpage as a string. |
| If result length more then 10000 symbols than return only first 10000. Use `offset` option to show another part of the web page content. |
| If an error occurs, returns an empty string. |
| |
| Example: |
| >>> content = await direct_fetch_tool("https://example.com/article") |
| >>> # Use after webSearchTool to get full page content |
| >>> results = webSearchTool("Python tutorial") |
| >>> if results: |
| >>> first_url = results[0].get('href') |
| >>> full_content = await direct_fetch_tool(first_url) |
| """ |
| try: |
| async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: |
| response = await client.get(url, headers={ |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| }) |
| response.raise_for_status() |
| html = response.text |
| |
| |
| |
| doc = Document(html) |
| readable_html = doc.summary() |
| |
| |
| soup = bs4.BeautifulSoup(readable_html, "html.parser") |
| |
| |
| for elem in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): |
| elem.decompose() |
| |
| |
| text = soup.get_text(separator='\n', strip=True) |
| |
| |
| lines = [line.strip() for line in text.split('\n') if line.strip()] |
| text = '\n'.join(lines) |
| |
| if len(text)>10000: |
| return f"Result is too big: {len(text)} chars. Return only slice ${offset}:${offset+10000}: \n\n" + text[offset:offset+10000] |
| |
| return text |
| except httpx.HTTPStatusError as e: |
| error_msg = f"Error fetching webpage (HTTP {e.response.status_code}): {url}" |
| print(error_msg) |
| return "" |
| except httpx.TimeoutException: |
| error_msg = f"Timeout while fetching webpage: {url}" |
| print(error_msg) |
| return "" |
| except httpx.RequestError as e: |
| error_msg = f"Error fetching webpage: {str(e)}" |
| print(error_msg) |
| return "" |
| except Exception as e: |
| error_msg = f"Unexpected error fetching webpage: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| async def describeImage(imgUrl: str, instructions: str = "Describe the image.") -> str: |
| """ |
| Describe an image using a image-to-text model. |
| """ |
| vision_llm = NebiusLLM( |
| api_key=os.getenv("NEBIUS_API_KEY"), |
| model="nvidia/Nemotron-Nano-V2-12b", |
| api_base="https://api.tokenfactory.nebius.com/v1" |
| ) |
| try: |
| messages = [ |
| ChatMessage( |
| role="user", |
| blocks=[ |
| TextBlock(text=instructions), |
| ImageBlock(url=imgUrl), |
| ], |
| ), |
| ] |
|
|
| |
| |
| |
| response = await vision_llm.achat(messages) |
|
|
| return str(response.message).split("</think>")[-1].strip() |
| except Exception as e: |
| error_msg = f"Error extracting text: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| async def transcribeAudio(audioUrlOrPath: str, language_code: str = None) -> str: |
| """ |
| Transcribe an audio or video file using ElevenLabs speech-to-text API. |
| |
| Args: |
| audioUrlOrPath: URL or local file path to the audio/video file |
| language_code: Optional language code (e.g., 'en', 'es', 'fr') |
| |
| Returns: |
| The transcribed text as a string |
| """ |
| api_key = os.getenv("ELEVENLABS_STT_API_KEY") |
| if not api_key: |
| error_msg = "Error: ELEVENLABS_STT_API_KEY not found in environment variables" |
| print(error_msg) |
| return "" |
| |
| try: |
| |
| is_url = audioUrlOrPath.startswith(('http://', 'https://')) |
| |
| |
| if is_url: |
| |
| async with httpx.AsyncClient() as client: |
| response = await client.get(audioUrlOrPath) |
| response.raise_for_status() |
| audio_data = response.content |
| |
| filename = audioUrlOrPath.split('/')[-1].split('?')[0] or 'audio_file' |
| else: |
| |
| with open(audioUrlOrPath, 'rb') as f: |
| audio_data = f.read() |
| filename = os.path.basename(audioUrlOrPath) |
| |
| |
| content_type, _ = mimetypes.guess_type(filename) |
| if not content_type: |
| content_type = 'application/octet-stream' |
| |
| |
| files = { |
| 'file': (filename, audio_data, content_type) |
| } |
| |
| data = { 'model_id': 'scribe_v1' } |
| if language_code: |
| data['language_code'] = language_code |
| |
| |
| async with httpx.AsyncClient() as client: |
| response = await client.post( |
| 'https://api.elevenlabs.io/v1/speech-to-text', |
| headers={ |
| 'xi-api-key': api_key |
| }, |
| files=files, |
| data=data, |
| timeout=300.0 |
| ) |
| response.raise_for_status() |
| result = response.json() |
| |
| |
| if 'text' in result: |
| return result['text'] |
| else: |
| |
| return str(result) |
| |
| except httpx.HTTPStatusError as e: |
| error_msg = f"Error transcribing audio (HTTP {e.response.status_code}): {e.response.text}" |
| print(error_msg) |
| return "" |
| except Exception as e: |
| error_msg = f"Error transcribing audio: {str(e)}" |
| print(error_msg) |
| return "" |
|
|
|
|
| if __name__ == "__main__": |
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| |
| |
| async def main(): |
| |
| url = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fstpagmaster.blob.core.windows.net%2Fcontainer-queensgambitaccepted-jpeg%2Fintro.png&f=1&nofb=1&ipt=1c348904c4fe4508d241e5527be8203e4cc2c029ed7e0cdeba3bbf372ab30a96" |
| print(await describeImage(url)) |
| |
| |
| |
| |
| |
| |
| |
| |
| asyncio.run(main()) |
| |