File size: 10,408 Bytes
dec6bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
422b737
dec6bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
422b737
 
dec6bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import asyncio
import os
import mimetypes
import httpx
from llama_index.core.llms import ChatMessage, TextBlock, ImageBlock
from llama_index.llms.nebius import NebiusLLM
from ddgs import DDGS
from ddgs.exceptions import DDGSException
import bs4
from readability import Document

def multiply(a: float, b: float) -> float:
    """Multiply two numbers and returns the product"""
    return a * b


def add(a: float, b: float) -> float:
    """Add two numbers and returns the sum"""
    return a + b


def webSearchTool(
    query: str,
    region: str = "us-en",
    timelimit: str | None = None
) -> list[dict[str, str]]:
    """
    Perform a web search using DuckDuckGo metasearch across multiple backends.
    
    This tool searches the web using DuckDuckGo's metasearch engine, which can query
    multiple search backends including Bing, Brave, DuckDuckGo, Google, Mojeek, 
    Yandex, Yahoo, and Wikipedia. Returns a list of search results with titles,
    snippets, and URLs.
    
    Args:
        query: The search query text. Supports advanced search operators:
            - "exact phrase" - Search for exact phrase
            - term1 -term2 - Exclude term2 from results
            - term1 +term2 - Emphasize term2 in results
            - term filetype:pdf - Search for specific file types (pdf, doc, docx, xls, xlsx, ppt, pptx, html)
            - term site:example.com - Search within a specific site
            - term -site:example.com - Exclude a specific site
            - intitle:term - Search in page titles
            - inurl:term - Search in page URLs
        
        region: Search region/locale. Examples: "us-en", "uk-en", "ru-ru", etc. Defaults to "us-en".
        
        timelimit: Limit results to a specific time period. Options: "d" (day), "w" (week), 
                "m" (month), "y" (year). Defaults to None (no time limit).
    
    Returns:
        A list of dictionaries, where each dictionary contains search result information
        with keys such as 'title', 'body', 'href', etc.
    
    Example:
        >>> results = webSearchTool("Python programming")
        >>> results = webSearchTool("machine learning filetype:pdf")
        >>> results = webSearchTool("news site:example.com")
    """
    try:
        return list(DDGS().text(
            query=query,
            region=region,
            timelimit=timelimit
        ))
    except DDGSException:
        # If no results found, return empty list
        return []


async def directFetchTool(url: str, offset: int = 0) -> str:
    """
    Fetch and extract only the meaningful readable content from a webpage,
    similar to Chrome/Firefox Reader Mode. Removes navigation, ads, sidebars,
    comments, and other non-essential content, keeping only the main article text.
    
    Args:
        url: The URL of the webpage to fetch. Must be a valid HTTP or HTTPS URL.
        offset: position from start of the web page content. Default  = 0
    Returns:
        The extracted meaningful text content of the webpage as a string.
        If result length more then 10000 symbols than return only first 10000. Use `offset` option to show another part of the web page content.
        If an error occurs, returns an empty string.
    
    Example:
        >>> content = await direct_fetch_tool("https://example.com/article")
        >>> # Use after webSearchTool to get full page content
        >>> results = webSearchTool("Python tutorial")
        >>> if results:
        >>>     first_url = results[0].get('href')
        >>>     full_content = await direct_fetch_tool(first_url)
    """
    try:
        async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
            response = await client.get(url, headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            })
            response.raise_for_status()
            html = response.text
        
        # Use readability-lxml to extract meaningful content (like Reader Mode)
        # This uses Mozilla's Readability algorithm
        doc = Document(html)
        readable_html = doc.summary()
        
        # Parse the cleaned HTML and extract text
        soup = bs4.BeautifulSoup(readable_html, "html.parser")
        
        # Remove any remaining script and style elements
        for elem in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
            elem.decompose()
        
        # Extract text with proper formatting
        text = soup.get_text(separator='\n', strip=True)
        
        # Clean up excessive whitespace while preserving paragraph breaks
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        text = '\n'.join(lines)
        
        if len(text)>10000:
            return f"Result is too big: {len(text)} chars. Return only slice ${offset}:${offset+10000}: \n\n" + text[offset:offset+10000]
            
        return text
    except httpx.HTTPStatusError as e:
        error_msg = f"Error fetching webpage (HTTP {e.response.status_code}): {url}"
        print(error_msg)
        return ""
    except httpx.TimeoutException:
        error_msg = f"Timeout while fetching webpage: {url}"
        print(error_msg)
        return ""
    except httpx.RequestError as e:
        error_msg = f"Error fetching webpage: {str(e)}"
        print(error_msg)
        return ""
    except Exception as e:
        error_msg = f"Unexpected error fetching webpage: {str(e)}"
        print(error_msg)
        return ""


async def describeImage(imgUrl: str, instructions: str = "Describe the image.") -> str:
    """
    Describe an image using a image-to-text model.
    """
    vision_llm = NebiusLLM(
        api_key=os.getenv("NEBIUS_API_KEY"),
        model="nvidia/Nemotron-Nano-V2-12b",
        api_base="https://api.tokenfactory.nebius.com/v1"
    )
    try:
        messages = [
            ChatMessage(
                role="user",
                blocks=[
                    TextBlock(text=instructions),
                    ImageBlock(url=imgUrl),
                ],
            ),
        ]

        # response = vision_llm.stream_chat(messages)
        # for r in response:
        #     print(r.delta, end="")
        response = await vision_llm.achat(messages)

        return str(response.message).split("</think>")[-1].strip()
    except Exception as e:
        error_msg = f"Error extracting text: {str(e)}"
        print(error_msg)
        return ""


async def transcribeAudio(audioUrlOrPath: str, language_code: str = None) -> str:
    """
    Transcribe an audio or video file using ElevenLabs speech-to-text API.
    
    Args:
        audioUrlOrPath: URL or local file path to the audio/video file
        language_code: Optional language code (e.g., 'en', 'es', 'fr')
    
    Returns:
        The transcribed text as a string
    """
    api_key = os.getenv("ELEVENLABS_STT_API_KEY")
    if not api_key:
        error_msg = "Error: ELEVENLABS_STT_API_KEY not found in environment variables"
        print(error_msg)
        return ""
    
    try:
        # Determine if input is a URL or file path
        is_url = audioUrlOrPath.startswith(('http://', 'https://'))
        
        # Prepare the audio file
        if is_url:
            # Download the file from URL
            async with httpx.AsyncClient() as client:
                response = await client.get(audioUrlOrPath)
                response.raise_for_status()
                audio_data = response.content
                # Try to get filename from URL or Content-Disposition header
                filename = audioUrlOrPath.split('/')[-1].split('?')[0] or 'audio_file'
        else:
            # Read from local file path
            with open(audioUrlOrPath, 'rb') as f:
                audio_data = f.read()
            filename = os.path.basename(audioUrlOrPath)
        
        # Detect MIME type from filename extension, fallback to octet-stream
        content_type, _ = mimetypes.guess_type(filename)
        if not content_type:
            content_type = 'application/octet-stream'
        
        # Prepare multipart form data
        files = {
            'file': (filename, audio_data, content_type)
        }
        
        data = { 'model_id': 'scribe_v1' }
        if language_code:
            data['language_code'] = language_code
        
        # Make the API request
        async with httpx.AsyncClient() as client:
            response = await client.post(
                'https://api.elevenlabs.io/v1/speech-to-text',
                headers={
                    'xi-api-key': api_key
                },
                files=files,
                data=data,
                timeout=300.0  # 5 minutes timeout for large files
            )
            response.raise_for_status()
            result = response.json()
            
            # Extract transcript from response
            if 'text' in result:
                return result['text']
            else:
                # Fallback: return the entire response as string
                return str(result)
                
    except httpx.HTTPStatusError as e:
        error_msg = f"Error transcribing audio (HTTP {e.response.status_code}): {e.response.text}"
        print(error_msg)
        return ""
    except Exception as e:
        error_msg = f"Error transcribing audio: {str(e)}"
        print(error_msg)
        return ""


if __name__ == "__main__":
    from dotenv import load_dotenv
    load_dotenv()

    # print(extract_shape("https://developers.llamaindex.ai/python/_astro/llamaindex-light.BJap_D_H.svg"))
    
    async def main():
        
        url = "https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fstpagmaster.blob.core.windows.net%2Fcontainer-queensgambitaccepted-jpeg%2Fintro.png&f=1&nofb=1&ipt=1c348904c4fe4508d241e5527be8203e4cc2c029ed7e0cdeba3bbf372ab30a96"
        print(await describeImage(url))
        
        # url = 'https://www.voiptroubleshooter.com/open_speech/american/OSR_us_000_0011_8k.wav'
        # print(await transcribeAudio(url, 'en'))
        # results = webSearchTool("Diplodocus nominated FA 2016")
        # print(results)
        # wp = await directFetchTool('https://en.wikipedia.org/wiki/Capital_of_France')
        # print(wp[:3000])
        # print('\n\n', len(wp))
    asyncio.run(main())