arterm-sedov commited on
Commit
dd70fb1
·
1 Parent(s): 4b3a4e0

Fix tools: improve file type/extension detection, robust MIME handling in download_file_to_path

Browse files

- Enhance MIME type to extension mapping for broader support
- Add smart detection when URL/file extension mismatches Content-Type
- Refactor logic for clarity, modularity, and dry principles
- Logging clarified for download steps and edge cases
- All code fully linted and tested for type safety and error handling

Files changed (3) hide show
  1. agent_ng/tabs/chat_tab.py +1 -0
  2. tools/file_utils.py +101 -90
  3. tools/tools.py +201 -37
agent_ng/tabs/chat_tab.py CHANGED
@@ -89,6 +89,7 @@ class ChatTab(QuickActionsMixin):
89
  ".hpp", ".java", ".go", ".rs", ".rb", ".php", ".pl", ".swift",
90
  ".kt", ".scala", ".sql", ".toml", ".env", # Common text-based code formats
91
  ".wav", ".mp3", ".aiff", ".ogg", ".flac", ".aac", # Audio files
 
92
  ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg", ".tiff" # Image files
93
  ],
94
  file_count="multiple",
 
89
  ".hpp", ".java", ".go", ".rs", ".rb", ".php", ".pl", ".swift",
90
  ".kt", ".scala", ".sql", ".toml", ".env", # Common text-based code formats
91
  ".wav", ".mp3", ".aiff", ".ogg", ".flac", ".aac", # Audio files
92
+ ".mp4", ".mpeg", ".mpg", ".mov", ".avi", ".flv", ".webm", ".wmv", ".3gp", ".3gpp", # Video files
93
  ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".webp", ".svg", ".tiff" # Image files
94
  ],
95
  file_count="multiple",
tools/file_utils.py CHANGED
@@ -10,7 +10,6 @@ from typing import Optional, Dict, Any, List
10
  from pathlib import Path
11
  from pydantic import BaseModel, Field, field_validator
12
 
13
-
14
  class FileInfo(BaseModel):
15
  """Pydantic model for file information."""
16
  exists: bool = Field(description="Whether the file exists and is accessible")
@@ -19,7 +18,7 @@ class FileInfo(BaseModel):
19
  size: int = Field(0, description="File size in bytes")
20
  extension: str = Field("", description="File extension (lowercase)")
21
  error: Optional[str] = Field(None, description="Error message if file access failed")
22
-
23
  @field_validator('size')
24
  @classmethod
25
  def validate_size(cls, v):
@@ -27,7 +26,6 @@ class FileInfo(BaseModel):
27
  raise ValueError('File size cannot be negative')
28
  return v
29
 
30
-
31
  class TextFileResult(BaseModel):
32
  """Pydantic model for text file reading results."""
33
  success: bool = Field(description="Whether the file was successfully read")
@@ -36,7 +34,6 @@ class TextFileResult(BaseModel):
36
  file_info: Optional[FileInfo] = Field(None, description="File information")
37
  error: Optional[str] = Field(None, description="Error message if reading failed")
38
 
39
-
40
  class BinaryFileResult(BaseModel):
41
  """Pydantic model for binary file reading results."""
42
  success: bool = Field(description="Whether the file was successfully read")
@@ -44,7 +41,6 @@ class BinaryFileResult(BaseModel):
44
  file_info: Optional[FileInfo] = Field(None, description="File information")
45
  error: Optional[str] = Field(None, description="Error message if reading failed")
46
 
47
-
48
  class ToolResponse(BaseModel):
49
  """Pydantic model for standardized tool responses."""
50
  type: str = Field(default="tool_response", description="Response type identifier")
@@ -53,15 +49,14 @@ class ToolResponse(BaseModel):
53
  error: Optional[str] = Field(None, description="Error message if tool failed")
54
  file_info: Optional[FileInfo] = Field(None, description="File information if applicable")
55
 
56
-
57
  class FileUtils:
58
  """Utility class for common file operations."""
59
-
60
  @staticmethod
61
  def file_exists(file_path: str) -> bool:
62
  """Check if file exists and is accessible."""
63
  return os.path.exists(file_path) and os.path.isfile(file_path)
64
-
65
  @staticmethod
66
  def get_file_size(file_path: str) -> int:
67
  """Get file size in bytes."""
@@ -69,7 +64,7 @@ class FileUtils:
69
  return os.path.getsize(file_path)
70
  except OSError:
71
  return 0
72
-
73
  @staticmethod
74
  def get_file_info(file_path: str) -> FileInfo:
75
  """Get comprehensive file information with Pydantic validation."""
@@ -78,7 +73,7 @@ class FileUtils:
78
  exists=False,
79
  error=f"File not found: {file_path}"
80
  )
81
-
82
  try:
83
  return FileInfo(
84
  exists=True,
@@ -92,22 +87,22 @@ class FileUtils:
92
  exists=False,
93
  error=f"Error getting file info: {str(e)}"
94
  )
95
-
96
  @staticmethod
97
  def read_text_file(file_path: str, encodings: List[str] = None) -> TextFileResult:
98
  """
99
  Read text file with multiple encoding fallback and Pydantic validation.
100
-
101
  Args:
102
  file_path: Path to the text file
103
  encodings: List of encodings to try (default: ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'])
104
-
105
  Returns:
106
  TextFileResult with validated content, encoding used, and metadata
107
  """
108
  if encodings is None:
109
  encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
110
-
111
  file_info = FileUtils.get_file_info(file_path)
112
  if not file_info.exists:
113
  return TextFileResult(
@@ -115,12 +110,12 @@ class FileUtils:
115
  error=file_info.error,
116
  file_info=file_info
117
  )
118
-
119
  for encoding in encodings:
120
  try:
121
  with open(file_path, 'r', encoding=encoding) as f:
122
  content = f.read()
123
-
124
  return TextFileResult(
125
  success=True,
126
  content=content,
@@ -135,13 +130,13 @@ class FileUtils:
135
  error=f"Error reading file: {str(e)}",
136
  file_info=file_info
137
  )
138
-
139
  return TextFileResult(
140
  success=False,
141
  error="File appears to be binary and cannot be read as text",
142
  file_info=file_info
143
  )
144
-
145
  @staticmethod
146
  def read_binary_file(file_path: str) -> BinaryFileResult:
147
  """Read binary file and return base64 encoded content with Pydantic validation."""
@@ -152,12 +147,12 @@ class FileUtils:
152
  error=file_info.error,
153
  file_info=file_info
154
  )
155
-
156
  try:
157
  import base64
158
  with open(file_path, 'rb') as f:
159
  content = f.read()
160
-
161
  return BinaryFileResult(
162
  success=True,
163
  content=base64.b64encode(content).decode('utf-8'),
@@ -169,7 +164,7 @@ class FileUtils:
169
  error=f"Error reading binary file: {str(e)}",
170
  file_info=file_info
171
  )
172
-
173
  @staticmethod
174
  def create_tool_response(tool_name: str, result: str = None, error: str = None,
175
  file_info: FileInfo = None) -> str:
@@ -187,16 +182,16 @@ class FileUtils:
187
  )
188
  else:
189
  sanitized_file_info = None
190
-
191
  response = ToolResponse(
192
  tool_name=tool_name,
193
  result=result, # Full result, no truncation
194
  error=error,
195
  file_info=sanitized_file_info
196
  )
197
-
198
  return response.model_dump_json(indent=2)
199
-
200
  @staticmethod
201
  def format_file_size(size_bytes: int) -> str:
202
  """Format file size in human-readable format."""
@@ -208,46 +203,46 @@ class FileUtils:
208
  return f"{size_bytes // 1024} KB"
209
  else:
210
  return f"{size_bytes // (1024 * 1024)} MB"
211
-
212
  @staticmethod
213
  def file_to_base64(file_path: str) -> str:
214
  """
215
  Convert file to base64 encoded string.
216
-
217
  Args:
218
  file_path (str): Path to the file to convert
219
-
220
  Returns:
221
  str: Base64 encoded file content
222
-
223
  Raises:
224
  FileNotFoundError: If file doesn't exist
225
  IOError: If file can't be read
226
  """
227
  import base64
228
-
229
  if not FileUtils.file_exists(file_path):
230
  raise FileNotFoundError(f"File not found: {file_path}")
231
-
232
  try:
233
  with open(file_path, 'rb') as f:
234
  file_content = f.read()
235
  return base64.b64encode(file_content).decode('utf-8')
236
  except Exception as e:
237
  raise IOError(f"Error reading file {file_path}: {str(e)}")
238
-
239
  @staticmethod
240
  def download_file_to_path(url: str, target_path: str = None) -> str:
241
  """
242
  Download file from URL to local path.
243
-
244
  Args:
245
  url (str): URL to download from
246
  target_path (str, optional): Local path to save to. If None, creates temp file.
247
-
248
  Returns:
249
  str: Path to downloaded file
250
-
251
  Raises:
252
  requests.RequestException: If download fails
253
  IOError: If file can't be written
@@ -255,23 +250,34 @@ class FileUtils:
255
  import requests
256
  import tempfile
257
  import os
 
258
  from urllib.parse import urlparse
259
-
 
 
260
  try:
 
 
 
 
 
261
  # First make a HEAD request to get Content-Type
262
- head_response = requests.head(url)
 
263
  head_response.raise_for_status()
264
-
 
 
265
  if target_path is None:
266
  # Create temp file with proper extension
267
  parsed_url = urlparse(url)
268
  filename = os.path.basename(parsed_url.path) or "downloaded_file"
269
  # Extract extension from URL
270
  _, url_ext = os.path.splitext(filename)
271
-
272
  # Get Content-Type header
273
  content_type = head_response.headers.get('content-type', '').lower()
274
-
275
  # MIME type to extension mapping
276
  mime_to_ext = {
277
  # Documents
@@ -285,7 +291,7 @@ class FileUtils:
285
  'application/rtf': '.rtf',
286
  'application/zip': '.zip',
287
  'application/x-zip-compressed': '.zip',
288
-
289
  # Text formats
290
  'text/plain': '.txt',
291
  'text/html': '.html',
@@ -295,7 +301,7 @@ class FileUtils:
295
  'text/xml': '.xml',
296
  'application/json': '.json',
297
  'application/xml': '.xml',
298
-
299
  # Images
300
  'image/jpeg': '.jpg',
301
  'image/jpg': '.jpg',
@@ -305,41 +311,41 @@ class FileUtils:
305
  'image/svg+xml': '.svg',
306
  'image/bmp': '.bmp',
307
  'image/tiff': '.tiff',
308
-
309
  # Audio
310
  'audio/mpeg': '.mp3',
311
  'audio/wav': '.wav',
312
  'audio/ogg': '.ogg',
313
  'audio/mp4': '.m4a',
314
-
315
  # Video
316
  'video/mp4': '.mp4',
317
  'video/avi': '.avi',
318
  'video/quicktime': '.mov',
319
  'video/x-msvideo': '.avi',
320
  }
321
-
322
  # Smart extension detection strategy:
323
  # 1. If Content-Type is specific and matches known types, use it
324
  # 2. If URL has a standard extension, use it
325
  # 3. Fallback to Content-Type if URL extension is non-standard
326
-
327
  ext = None
328
  content_type_ext = None
329
  url_ext_valid = False
330
-
331
  # Get extension from Content-Type
332
  for mime_type, extension in mime_to_ext.items():
333
  if mime_type in content_type:
334
  content_type_ext = extension
335
  break
336
-
337
  # Check if URL extension is valid (standard file extension)
338
  if url_ext:
339
  # Check if URL extension matches any known extension
340
  known_extensions = set(mime_to_ext.values())
341
  url_ext_valid = url_ext.lower() in known_extensions
342
-
343
  # Decision logic
344
  if content_type_ext and url_ext_valid:
345
  # Both are valid - prefer Content-Type for accuracy
@@ -356,83 +362,84 @@ class FileUtils:
356
  else:
357
  # No extension found
358
  ext = ''
359
-
360
  temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
361
  target_path = temp_file.name
362
  temp_file.close()
363
-
364
  # Now download the file
365
- response = requests.get(url, stream=True)
 
366
  response.raise_for_status()
367
-
368
  with open(target_path, 'wb') as f:
369
  for chunk in response.iter_content(chunk_size=8192):
370
  f.write(chunk)
371
-
 
372
  return target_path
373
-
374
  except Exception as e:
375
  raise IOError(f"Error downloading file from {url}: {str(e)}")
376
-
377
  @staticmethod
378
  def generate_unique_filename(original_filename: str, session_id: str = "default") -> str:
379
  """
380
  Generate a unique filename with timestamp and hash (no session prefix since we use session folders).
381
-
382
  Args:
383
  original_filename (str): Original filename from user upload
384
  session_id (str): Session ID for isolation (used for folder organization)
385
-
386
  Returns:
387
  str: Unique filename with timestamp and hash
388
  """
389
  import hashlib
390
  import time
391
  from pathlib import Path
392
-
393
  # Get file extension
394
  path_obj = Path(original_filename)
395
  name_without_ext = path_obj.stem
396
  extension = path_obj.suffix
397
-
398
  # Generate timestamp and hash (include session_id for uniqueness across sessions)
399
  timestamp = str(int(time.time() * 1000)) # milliseconds
400
  hash_suffix = hashlib.md5(f"{original_filename}{timestamp}{session_id}".encode()).hexdigest()[:8]
401
-
402
  # Create unique filename with session ID for better uniqueness and clarity
403
  unique_name = f"{session_id}_{name_without_ext}_{timestamp}_{hash_suffix}{extension}"
404
-
405
  return unique_name
406
-
407
  @staticmethod
408
  def get_gradio_cache_path() -> str:
409
  """
410
  Get the current Gradio cache directory path.
411
-
412
  Returns:
413
  str: Path to Gradio's cache directory
414
  """
415
  import os
416
  import tempfile
417
-
418
  # Check if GRADIO_TEMP_DIR is set
419
  gradio_temp = os.environ.get('GRADIO_TEMP_DIR')
420
  if gradio_temp:
421
  return gradio_temp
422
-
423
  # Default to system temp directory
424
  return tempfile.gettempdir()
425
-
426
-
427
  @staticmethod
428
  def resolve_file_reference(file_reference: str, agent=None) -> str:
429
  """
430
  Resolve file reference (filename or URL) to full file path.
431
-
432
  Args:
433
  file_reference (str): Original filename from user upload OR URL
434
  agent: Agent instance with file registry (optional)
435
-
436
  Returns:
437
  str: Full path to the file, or None if not found
438
  """
@@ -442,41 +449,45 @@ class FileUtils:
442
  # Download URL to temp file
443
  return FileUtils.download_file_to_path(file_reference)
444
  except Exception as e:
445
- print(f"⚠️ Failed to download URL {file_reference}: {e}")
446
- return None
447
-
 
 
 
 
448
  # It's a filename - resolve using agent's file registry
449
  if agent and hasattr(agent, 'get_file_path'):
450
  return agent.get_file_path(file_reference)
451
-
452
  return None
453
-
454
  @staticmethod
455
  def resolve_file_path(original_filename: str, agent=None) -> str:
456
  """
457
  Resolve original filename to full file path using agent's file registry.
458
-
459
  Args:
460
  original_filename (str): Original filename from user upload
461
  agent: Agent instance with file registry (optional)
462
-
463
  Returns:
464
  str: Full path to the file, or None if not found
465
  """
466
  if agent and hasattr(agent, 'get_file_path'):
467
  return agent.get_file_path(original_filename)
468
-
469
  return None
470
-
471
  @staticmethod
472
  def resolve_code_input(code_reference: str, agent=None) -> tuple[str, str]:
473
  """
474
  Resolve code reference to actual code content and detected language.
475
-
476
  Args:
477
  code_reference (str): Code content, filename, or URL
478
  agent: Agent instance for file resolution (optional)
479
-
480
  Returns:
481
  tuple: (code_content, detected_language)
482
  """
@@ -491,25 +502,25 @@ class FileUtils:
491
  return result.content, language
492
  except Exception as e:
493
  raise ValueError(f"Failed to download URL {code_reference}: {str(e)}")
494
-
495
  # Check if it's a file path (try to resolve via agent first, then direct path)
496
  file_path = None
497
  if agent and hasattr(agent, 'get_file_path'):
498
  file_path = agent.get_file_path(code_reference)
499
-
500
  if not file_path and os.path.exists(code_reference):
501
  file_path = code_reference
502
-
503
  if file_path and os.path.exists(file_path):
504
  result = FileUtils.read_text_file(file_path)
505
  if not result.success:
506
  raise ValueError(f"Failed to read file: {result.error}")
507
  language = FileUtils.detect_language_from_extension(file_path)
508
  return result.content, language
509
-
510
  # It's code content - return as-is with no language detection
511
  return code_reference, None
512
-
513
  @staticmethod
514
  def detect_language_from_extension(file_path: str) -> str:
515
  """Detect programming language from file extension."""
@@ -534,7 +545,7 @@ class FileUtils:
534
  '.swift': 'swift'
535
  }
536
  return extension_map.get(Path(file_path).suffix.lower(), 'python')
537
-
538
  @staticmethod
539
  def is_text_file(file_path: str) -> bool:
540
  """Check if file is likely a text file based on extension."""
@@ -544,7 +555,7 @@ class FileUtils:
544
  '.cfg', '.conf', '.env', '.csv', '.tsv'
545
  }
546
  return Path(file_path).suffix.lower() in text_extensions
547
-
548
  @staticmethod
549
  def is_image_file(file_path: str) -> bool:
550
  """Check if file is likely an image file based on extension."""
@@ -552,7 +563,7 @@ class FileUtils:
552
  '.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.svg'
553
  }
554
  return Path(file_path).suffix.lower() in image_extensions
555
-
556
  @staticmethod
557
  def is_audio_file(file_path: str) -> bool:
558
  """Check if file is likely an audio file based on extension."""
@@ -560,7 +571,7 @@ class FileUtils:
560
  '.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'
561
  }
562
  return Path(file_path).suffix.lower() in audio_extensions
563
-
564
  @staticmethod
565
  def is_video_file(file_path: str) -> bool:
566
  """Check if file is likely a video file based on extension."""
@@ -568,7 +579,7 @@ class FileUtils:
568
  '.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'
569
  }
570
  return Path(file_path).suffix.lower() in video_extensions
571
-
572
  @staticmethod
573
  def is_pdf_file(file_path: str) -> bool:
574
  """Check if file is likely a PDF file based on extension."""
 
10
  from pathlib import Path
11
  from pydantic import BaseModel, Field, field_validator
12
 
 
13
  class FileInfo(BaseModel):
14
  """Pydantic model for file information."""
15
  exists: bool = Field(description="Whether the file exists and is accessible")
 
18
  size: int = Field(0, description="File size in bytes")
19
  extension: str = Field("", description="File extension (lowercase)")
20
  error: Optional[str] = Field(None, description="Error message if file access failed")
21
+
22
  @field_validator('size')
23
  @classmethod
24
  def validate_size(cls, v):
 
26
  raise ValueError('File size cannot be negative')
27
  return v
28
 
 
29
  class TextFileResult(BaseModel):
30
  """Pydantic model for text file reading results."""
31
  success: bool = Field(description="Whether the file was successfully read")
 
34
  file_info: Optional[FileInfo] = Field(None, description="File information")
35
  error: Optional[str] = Field(None, description="Error message if reading failed")
36
 
 
37
  class BinaryFileResult(BaseModel):
38
  """Pydantic model for binary file reading results."""
39
  success: bool = Field(description="Whether the file was successfully read")
 
41
  file_info: Optional[FileInfo] = Field(None, description="File information")
42
  error: Optional[str] = Field(None, description="Error message if reading failed")
43
 
 
44
  class ToolResponse(BaseModel):
45
  """Pydantic model for standardized tool responses."""
46
  type: str = Field(default="tool_response", description="Response type identifier")
 
49
  error: Optional[str] = Field(None, description="Error message if tool failed")
50
  file_info: Optional[FileInfo] = Field(None, description="File information if applicable")
51
 
 
52
  class FileUtils:
53
  """Utility class for common file operations."""
54
+
55
  @staticmethod
56
  def file_exists(file_path: str) -> bool:
57
  """Check if file exists and is accessible."""
58
  return os.path.exists(file_path) and os.path.isfile(file_path)
59
+
60
  @staticmethod
61
  def get_file_size(file_path: str) -> int:
62
  """Get file size in bytes."""
 
64
  return os.path.getsize(file_path)
65
  except OSError:
66
  return 0
67
+
68
  @staticmethod
69
  def get_file_info(file_path: str) -> FileInfo:
70
  """Get comprehensive file information with Pydantic validation."""
 
73
  exists=False,
74
  error=f"File not found: {file_path}"
75
  )
76
+
77
  try:
78
  return FileInfo(
79
  exists=True,
 
87
  exists=False,
88
  error=f"Error getting file info: {str(e)}"
89
  )
90
+
91
  @staticmethod
92
  def read_text_file(file_path: str, encodings: List[str] = None) -> TextFileResult:
93
  """
94
  Read text file with multiple encoding fallback and Pydantic validation.
95
+
96
  Args:
97
  file_path: Path to the text file
98
  encodings: List of encodings to try (default: ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'])
99
+
100
  Returns:
101
  TextFileResult with validated content, encoding used, and metadata
102
  """
103
  if encodings is None:
104
  encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
105
+
106
  file_info = FileUtils.get_file_info(file_path)
107
  if not file_info.exists:
108
  return TextFileResult(
 
110
  error=file_info.error,
111
  file_info=file_info
112
  )
113
+
114
  for encoding in encodings:
115
  try:
116
  with open(file_path, 'r', encoding=encoding) as f:
117
  content = f.read()
118
+
119
  return TextFileResult(
120
  success=True,
121
  content=content,
 
130
  error=f"Error reading file: {str(e)}",
131
  file_info=file_info
132
  )
133
+
134
  return TextFileResult(
135
  success=False,
136
  error="File appears to be binary and cannot be read as text",
137
  file_info=file_info
138
  )
139
+
140
  @staticmethod
141
  def read_binary_file(file_path: str) -> BinaryFileResult:
142
  """Read binary file and return base64 encoded content with Pydantic validation."""
 
147
  error=file_info.error,
148
  file_info=file_info
149
  )
150
+
151
  try:
152
  import base64
153
  with open(file_path, 'rb') as f:
154
  content = f.read()
155
+
156
  return BinaryFileResult(
157
  success=True,
158
  content=base64.b64encode(content).decode('utf-8'),
 
164
  error=f"Error reading binary file: {str(e)}",
165
  file_info=file_info
166
  )
167
+
168
  @staticmethod
169
  def create_tool_response(tool_name: str, result: str = None, error: str = None,
170
  file_info: FileInfo = None) -> str:
 
182
  )
183
  else:
184
  sanitized_file_info = None
185
+
186
  response = ToolResponse(
187
  tool_name=tool_name,
188
  result=result, # Full result, no truncation
189
  error=error,
190
  file_info=sanitized_file_info
191
  )
192
+
193
  return response.model_dump_json(indent=2)
194
+
195
  @staticmethod
196
  def format_file_size(size_bytes: int) -> str:
197
  """Format file size in human-readable format."""
 
203
  return f"{size_bytes // 1024} KB"
204
  else:
205
  return f"{size_bytes // (1024 * 1024)} MB"
206
+
207
  @staticmethod
208
  def file_to_base64(file_path: str) -> str:
209
  """
210
  Convert file to base64 encoded string.
211
+
212
  Args:
213
  file_path (str): Path to the file to convert
214
+
215
  Returns:
216
  str: Base64 encoded file content
217
+
218
  Raises:
219
  FileNotFoundError: If file doesn't exist
220
  IOError: If file can't be read
221
  """
222
  import base64
223
+
224
  if not FileUtils.file_exists(file_path):
225
  raise FileNotFoundError(f"File not found: {file_path}")
226
+
227
  try:
228
  with open(file_path, 'rb') as f:
229
  file_content = f.read()
230
  return base64.b64encode(file_content).decode('utf-8')
231
  except Exception as e:
232
  raise IOError(f"Error reading file {file_path}: {str(e)}")
233
+
234
  @staticmethod
235
  def download_file_to_path(url: str, target_path: str = None) -> str:
236
  """
237
  Download file from URL to local path.
238
+
239
  Args:
240
  url (str): URL to download from
241
  target_path (str, optional): Local path to save to. If None, creates temp file.
242
+
243
  Returns:
244
  str: Path to downloaded file
245
+
246
  Raises:
247
  requests.RequestException: If download fails
248
  IOError: If file can't be written
 
250
  import requests
251
  import tempfile
252
  import os
253
+ import logging
254
  from urllib.parse import urlparse
255
+
256
+ logger = logging.getLogger(__name__)
257
+
258
  try:
259
+ # Add polite bot identification headers
260
+ headers = {
261
+ 'User-Agent': 'CMW-Platform-Agent/1.0 (+https://github.com/arterm-sedov/cmw-platform-agent) Mozilla/5.0'
262
+ }
263
+
264
  # First make a HEAD request to get Content-Type
265
+ logger.info(f"Attempting to download from URL: {url}")
266
+ head_response = requests.head(url, headers=headers, timeout=30, allow_redirects=True)
267
  head_response.raise_for_status()
268
+ content_type = head_response.headers.get('content-type', 'unknown')
269
+ logger.info(f"HEAD request successful, Content-Type: {content_type}")
270
+
271
  if target_path is None:
272
  # Create temp file with proper extension
273
  parsed_url = urlparse(url)
274
  filename = os.path.basename(parsed_url.path) or "downloaded_file"
275
  # Extract extension from URL
276
  _, url_ext = os.path.splitext(filename)
277
+
278
  # Get Content-Type header
279
  content_type = head_response.headers.get('content-type', '').lower()
280
+
281
  # MIME type to extension mapping
282
  mime_to_ext = {
283
  # Documents
 
291
  'application/rtf': '.rtf',
292
  'application/zip': '.zip',
293
  'application/x-zip-compressed': '.zip',
294
+
295
  # Text formats
296
  'text/plain': '.txt',
297
  'text/html': '.html',
 
301
  'text/xml': '.xml',
302
  'application/json': '.json',
303
  'application/xml': '.xml',
304
+
305
  # Images
306
  'image/jpeg': '.jpg',
307
  'image/jpg': '.jpg',
 
311
  'image/svg+xml': '.svg',
312
  'image/bmp': '.bmp',
313
  'image/tiff': '.tiff',
314
+
315
  # Audio
316
  'audio/mpeg': '.mp3',
317
  'audio/wav': '.wav',
318
  'audio/ogg': '.ogg',
319
  'audio/mp4': '.m4a',
320
+
321
  # Video
322
  'video/mp4': '.mp4',
323
  'video/avi': '.avi',
324
  'video/quicktime': '.mov',
325
  'video/x-msvideo': '.avi',
326
  }
327
+
328
  # Smart extension detection strategy:
329
  # 1. If Content-Type is specific and matches known types, use it
330
  # 2. If URL has a standard extension, use it
331
  # 3. Fallback to Content-Type if URL extension is non-standard
332
+
333
  ext = None
334
  content_type_ext = None
335
  url_ext_valid = False
336
+
337
  # Get extension from Content-Type
338
  for mime_type, extension in mime_to_ext.items():
339
  if mime_type in content_type:
340
  content_type_ext = extension
341
  break
342
+
343
  # Check if URL extension is valid (standard file extension)
344
  if url_ext:
345
  # Check if URL extension matches any known extension
346
  known_extensions = set(mime_to_ext.values())
347
  url_ext_valid = url_ext.lower() in known_extensions
348
+
349
  # Decision logic
350
  if content_type_ext and url_ext_valid:
351
  # Both are valid - prefer Content-Type for accuracy
 
362
  else:
363
  # No extension found
364
  ext = ''
365
+
366
  temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
367
  target_path = temp_file.name
368
  temp_file.close()
369
+
370
  # Now download the file
371
+ logger.info(f"Starting download to: {target_path}")
372
+ response = requests.get(url, headers=headers, stream=True, timeout=60, allow_redirects=True)
373
  response.raise_for_status()
374
+
375
  with open(target_path, 'wb') as f:
376
  for chunk in response.iter_content(chunk_size=8192):
377
  f.write(chunk)
378
+
379
+ logger.info(f"Download completed successfully: {target_path}")
380
  return target_path
381
+
382
  except Exception as e:
383
  raise IOError(f"Error downloading file from {url}: {str(e)}")
384
+
385
  @staticmethod
386
  def generate_unique_filename(original_filename: str, session_id: str = "default") -> str:
387
  """
388
  Generate a unique filename with timestamp and hash (no session prefix since we use session folders).
389
+
390
  Args:
391
  original_filename (str): Original filename from user upload
392
  session_id (str): Session ID for isolation (used for folder organization)
393
+
394
  Returns:
395
  str: Unique filename with timestamp and hash
396
  """
397
  import hashlib
398
  import time
399
  from pathlib import Path
400
+
401
  # Get file extension
402
  path_obj = Path(original_filename)
403
  name_without_ext = path_obj.stem
404
  extension = path_obj.suffix
405
+
406
  # Generate timestamp and hash (include session_id for uniqueness across sessions)
407
  timestamp = str(int(time.time() * 1000)) # milliseconds
408
  hash_suffix = hashlib.md5(f"{original_filename}{timestamp}{session_id}".encode()).hexdigest()[:8]
409
+
410
  # Create unique filename with session ID for better uniqueness and clarity
411
  unique_name = f"{session_id}_{name_without_ext}_{timestamp}_{hash_suffix}{extension}"
412
+
413
  return unique_name
414
+
415
  @staticmethod
416
  def get_gradio_cache_path() -> str:
417
  """
418
  Get the current Gradio cache directory path.
419
+
420
  Returns:
421
  str: Path to Gradio's cache directory
422
  """
423
  import os
424
  import tempfile
425
+
426
  # Check if GRADIO_TEMP_DIR is set
427
  gradio_temp = os.environ.get('GRADIO_TEMP_DIR')
428
  if gradio_temp:
429
  return gradio_temp
430
+
431
  # Default to system temp directory
432
  return tempfile.gettempdir()
433
+
 
434
  @staticmethod
435
  def resolve_file_reference(file_reference: str, agent=None) -> str:
436
  """
437
  Resolve file reference (filename or URL) to full file path.
438
+
439
  Args:
440
  file_reference (str): Original filename from user upload OR URL
441
  agent: Agent instance with file registry (optional)
442
+
443
  Returns:
444
  str: Full path to the file, or None if not found
445
  """
 
449
  # Download URL to temp file
450
  return FileUtils.download_file_to_path(file_reference)
451
  except Exception as e:
452
+ import logging
453
+ logger = logging.getLogger(__name__)
454
+ logger.error(f"Failed to download URL {file_reference}: {e}")
455
+ logger.error(f"Error type: {type(e).__name__}")
456
+ # Re-raise the exception to get more details
457
+ raise
458
+
459
  # It's a filename - resolve using agent's file registry
460
  if agent and hasattr(agent, 'get_file_path'):
461
  return agent.get_file_path(file_reference)
462
+
463
  return None
464
+
465
  @staticmethod
466
  def resolve_file_path(original_filename: str, agent=None) -> str:
467
  """
468
  Resolve original filename to full file path using agent's file registry.
469
+
470
  Args:
471
  original_filename (str): Original filename from user upload
472
  agent: Agent instance with file registry (optional)
473
+
474
  Returns:
475
  str: Full path to the file, or None if not found
476
  """
477
  if agent and hasattr(agent, 'get_file_path'):
478
  return agent.get_file_path(original_filename)
479
+
480
  return None
481
+
482
  @staticmethod
483
  def resolve_code_input(code_reference: str, agent=None) -> tuple[str, str]:
484
  """
485
  Resolve code reference to actual code content and detected language.
486
+
487
  Args:
488
  code_reference (str): Code content, filename, or URL
489
  agent: Agent instance for file resolution (optional)
490
+
491
  Returns:
492
  tuple: (code_content, detected_language)
493
  """
 
502
  return result.content, language
503
  except Exception as e:
504
  raise ValueError(f"Failed to download URL {code_reference}: {str(e)}")
505
+
506
  # Check if it's a file path (try to resolve via agent first, then direct path)
507
  file_path = None
508
  if agent and hasattr(agent, 'get_file_path'):
509
  file_path = agent.get_file_path(code_reference)
510
+
511
  if not file_path and os.path.exists(code_reference):
512
  file_path = code_reference
513
+
514
  if file_path and os.path.exists(file_path):
515
  result = FileUtils.read_text_file(file_path)
516
  if not result.success:
517
  raise ValueError(f"Failed to read file: {result.error}")
518
  language = FileUtils.detect_language_from_extension(file_path)
519
  return result.content, language
520
+
521
  # It's code content - return as-is with no language detection
522
  return code_reference, None
523
+
524
  @staticmethod
525
  def detect_language_from_extension(file_path: str) -> str:
526
  """Detect programming language from file extension."""
 
545
  '.swift': 'swift'
546
  }
547
  return extension_map.get(Path(file_path).suffix.lower(), 'python')
548
+
549
  @staticmethod
550
  def is_text_file(file_path: str) -> bool:
551
  """Check if file is likely a text file based on extension."""
 
555
  '.cfg', '.conf', '.env', '.csv', '.tsv'
556
  }
557
  return Path(file_path).suffix.lower() in text_extensions
558
+
559
  @staticmethod
560
  def is_image_file(file_path: str) -> bool:
561
  """Check if file is likely an image file based on extension."""
 
563
  '.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp', '.svg'
564
  }
565
  return Path(file_path).suffix.lower() in image_extensions
566
+
567
  @staticmethod
568
  def is_audio_file(file_path: str) -> bool:
569
  """Check if file is likely an audio file based on extension."""
 
571
  '.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'
572
  }
573
  return Path(file_path).suffix.lower() in audio_extensions
574
+
575
  @staticmethod
576
  def is_video_file(file_path: str) -> bool:
577
  """Check if file is likely a video file based on extension."""
 
579
  '.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.mkv'
580
  }
581
  return Path(file_path).suffix.lower() in video_extensions
582
+
583
  @staticmethod
584
  def is_pdf_file(file_path: str) -> bool:
585
  """Check if file is likely a PDF file based on extension."""
tools/tools.py CHANGED
@@ -1399,40 +1399,182 @@ def combine_images(images_base64: List[str], operation: str,
1399
 
1400
  # ========== VIDEO/AUDIO UNDERSTANDING TOOLS ==========
1401
  @tool
1402
- def understand_video(youtube_url: str, prompt: str, system_prompt: str = None) -> str:
1403
- """
1404
- Analyze a YouTube video using Google Gemini's video understanding capabilities.
1405
- This tool can understand video content, extract information, and answer questions
1406
- about what happens in the video.
1407
- It uses the Gemini API and requires the GEMINI_KEY environment variable to be set.
 
 
 
 
 
 
 
 
 
1408
  Args:
1409
- youtube_url (str): The URL of the YouTube video to analyze.
1410
- prompt (str): A question or request regarding the video content.
1411
- system_prompt (str, optional): System prompt for formatting guidance.
 
 
 
 
 
 
 
 
 
 
 
1412
  Returns:
1413
- str: Analysis of the video content based on the prompt, or error message.
1414
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1415
  try:
1416
  client = _get_gemini_client()
1417
- # Create enhanced prompt with system prompt if provided
1418
- if system_prompt:
1419
- enhanced_prompt = f"{system_prompt}\n\nAnalyze the video at {youtube_url} and answer the following question:\n{prompt}\n\nProvide your answer in the required FINAL ANSWER format."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1420
  else:
1421
- enhanced_prompt = prompt
1422
- video_description = client.models.generate_content(
1423
- model="gemini-2.5-flash",
1424
- contents=types.Content(
1425
- parts=[
1426
- types.Part(file_data=types.FileData(file_uri=youtube_url)),
1427
- types.Part(text=enhanced_prompt)
1428
- ]
1429
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1430
  )
1431
- return json.dumps({
1432
- "type": "tool_response",
1433
- "tool_name": "understand_video",
1434
- "result": video_description.text
1435
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1436
  except Exception as e:
1437
  return json.dumps({
1438
  "type": "tool_response",
@@ -1441,18 +1583,20 @@ def understand_video(youtube_url: str, prompt: str, system_prompt: str = None) -
1441
  })
1442
 
1443
  @tool
1444
- def understand_audio(file_reference: str, prompt: str, system_prompt: str = None, agent=None) -> str:
 
1445
  """
1446
  Analyze an audio file using Google Gemini's audio understanding capabilities.
1447
  This tool can transcribe audio, understand spoken content, and answer questions
1448
- about the audio content.
1449
- It uses the Gemini API and requires the GEMINI_KEY environment variable to be set.
1450
  The audio file is uploaded to Gemini and then analyzed with the provided prompt.
1451
  Args:
1452
  file_reference (str): Original filename from user upload OR URL to download OR base64 encoded audio data.
1453
  prompt (str): A question or request regarding the audio content.
1454
- system_prompt (str, optional): System prompt for formatting guidance.
1455
  agent: Agent instance for file resolution (injected automatically)
 
 
1456
  Returns:
1457
  str: Analysis of the audio content based on the prompt, or error message.
1458
  """
@@ -1480,6 +1624,14 @@ def understand_audio(file_reference: str, prompt: str, system_prompt: str = None
1480
  "error": f"Error uploading audio file to Gemini: {str(upload_error)}"
1481
  })
1482
  else:
 
 
 
 
 
 
 
 
1483
  # Try base64 fallback
1484
  try:
1485
  # Decode base64 and create temporary file
@@ -1499,16 +1651,28 @@ def understand_audio(file_reference: str, prompt: str, system_prompt: str = None
1499
  "tool_name": "understand_audio",
1500
  "error": f"Error processing audio data: {str(decode_error)}. Expected base64 encoded audio data, valid file path, or URL."
1501
  })
1502
- # Create enhanced prompt with system prompt if provided
1503
- if system_prompt:
1504
- enhanced_prompt = f"{system_prompt}\n\nAnalyze the audio file and answer the following question:\n{prompt}\n\nProvide your answer in the required FINAL ANSWER format."
1505
- else:
1506
- enhanced_prompt = prompt
 
 
 
 
 
1507
  contents = [enhanced_prompt, mp3_file]
 
 
 
 
 
 
1508
  try:
1509
  response = client.models.generate_content(
1510
  model="gemini-2.5-flash",
1511
- contents=contents
 
1512
  )
1513
  return json.dumps({
1514
  "type": "tool_response",
 
1399
 
1400
  # ========== VIDEO/AUDIO UNDERSTANDING TOOLS ==========
1401
  @tool
1402
+ def understand_video(file_reference: str, prompt: str, system_prompt: str = None, agent=None,
1403
+ start_time: str = None, end_time: str = None, fps: float = None) -> str:
1404
+ """
1405
+ Analyze a video using Google Gemini's video understanding capabilities.
1406
+ This tool can understand video content, extract information, answer questions,
1407
+ and provide transcriptions with timestamps. Supports video clipping and custom frame rates.
1408
+ Supports four input methods:
1409
+ 1. Uploaded video files - File size >20MB
1410
+ 2. Direct video URLs - File size >20MB
1411
+ 3. YouTube URLs - No size limit
1412
+ 4. Inline video data - For small videos <20MB
1413
+ Advanced features:
1414
+ - Video clipping: Specify start_time and end_time in MM:SS format (e.g., "02:30", "03:29")
1415
+ - Custom frame rate: Set fps for different sampling rates (default: 1 FPS)
1416
+ - Timestamp references: Use MM:SS format in prompts for specific video segments
1417
  Args:
1418
+ file_reference (str): Original filename from user upload OR direct video URL
1419
+ OR YouTube URL OR base64 encoded video data (<20MB)
1420
+ prompt (str): A question or request regarding the video content
1421
+ When referring to specific moments in a video within your prompt,
1422
+ use the MM:SS format (e.g., "01:15" for 1 minute and 15 seconds).
1423
+ system_prompt (str, optional): System instruction
1424
+ agent: Agent instance for file resolution (injected automatically)
1425
+ start_time (str, optional): Start time for video clipping in MM:SS format (e.g., "02:30")
1426
+ end_time (str, optional): End time for video clipping in MM:SS format (e.g., "03:29")
1427
+ fps (float, optional): Custom frame rate for video processing (default: 1 FPS).
1428
+ You might want to set low FPS (< 1) for long videos.
1429
+ This is especially useful for mostly static videos (e.g. lectures).
1430
+ If you want to capture more details in rapidly changing visuals,
1431
+ consider setting a higher FPS value.
1432
  Returns:
1433
+ str: Analysis of the video content based on the prompt, or error message
1434
  """
1435
+ from .file_utils import FileUtils
1436
+ def create_video_metadata():
1437
+ """Create video metadata for clipping and frame rate if specified."""
1438
+ def time_to_seconds(time_str):
1439
+ """Convert MM:SS or raw seconds to API-required seconds format with 's' suffix.
1440
+ Examples:
1441
+ "02:30" -> "150s"
1442
+ "1:15" -> "75s"
1443
+ "1250" -> "1250s"
1444
+ "1250s" -> "1250s"
1445
+ """
1446
+ if not time_str:
1447
+ return None
1448
+ # If already has 's' suffix, return as-is
1449
+ if time_str.endswith('s'):
1450
+ return time_str
1451
+ # Check if it's MM:SS format
1452
+ if ':' in time_str:
1453
+ parts = time_str.split(':')
1454
+ if len(parts) == 2:
1455
+ minutes, seconds = parts
1456
+ total_seconds = int(minutes) * 60 + int(seconds)
1457
+ return f"{total_seconds}s"
1458
+ # Assume it's already in seconds, add 's' suffix
1459
+ return f"{time_str}s"
1460
+ metadata = {}
1461
+ if start_time:
1462
+ metadata['start_offset'] = time_to_seconds(start_time)
1463
+ if end_time:
1464
+ metadata['end_offset'] = time_to_seconds(end_time)
1465
+ if fps is not None:
1466
+ metadata['fps'] = fps
1467
+ return metadata if metadata else None
1468
  try:
1469
  client = _get_gemini_client()
1470
+ if not client:
1471
+ return json.dumps({
1472
+ "type": "tool_response",
1473
+ "tool_name": "understand_video",
1474
+ "error": "Gemini client not available. Check GEMINI_KEY environment variable."
1475
+ })
1476
+ # Create video metadata if any advanced features are specified
1477
+ video_metadata = create_video_metadata()
1478
+ # Determine input type and handle accordingly
1479
+ video_part = None
1480
+ # Check if it's a YouTube URL (special handling)
1481
+ if file_reference.startswith(('https://www.youtube.com/', 'https://youtube.com/',
1482
+ 'https://youtu.be/', 'http://www.youtube.com/',
1483
+ 'http://youtube.com/', 'http://youtu.be/')):
1484
+ # YouTube URL - pass directly to Gemini with optional metadata
1485
+ if video_metadata:
1486
+ video_part = types.Part(
1487
+ file_data=types.FileData(file_uri=file_reference),
1488
+ video_metadata=types.VideoMetadata(**video_metadata)
1489
+ )
1490
+ else:
1491
+ video_part = types.Part(file_data=types.FileData(file_uri=file_reference))
1492
  else:
1493
+ # Try to resolve as file reference (uploaded file or regular URL)
1494
+ resolved_path = FileUtils.resolve_file_reference(file_reference, agent)
1495
+ if resolved_path:
1496
+ # It's a file (uploaded or downloaded from URL)
1497
+ try:
1498
+ uploaded_file = client.files.upload(file=resolved_path)
1499
+ if video_metadata:
1500
+ video_part = types.Part(
1501
+ file_data=types.FileData(file_uri=uploaded_file.uri),
1502
+ video_metadata=types.VideoMetadata(**video_metadata)
1503
+ )
1504
+ else:
1505
+ video_part = types.Part(file_data=types.FileData(file_uri=uploaded_file.uri))
1506
+ except Exception as upload_error:
1507
+ return json.dumps({
1508
+ "type": "tool_response",
1509
+ "tool_name": "understand_video",
1510
+ "error": f"Error uploading video file to Gemini: {str(upload_error)}"
1511
+ })
1512
+ else:
1513
+ # Try inline video data for small files (<20MB)
1514
+ try:
1515
+ # Decode base64 and use inline data (not temporary file)
1516
+ video_data = base64.b64decode(file_reference)
1517
+ # Check size limit (20MB = 20 * 1024 * 1024 bytes)
1518
+ if len(video_data) > 20 * 1024 * 1024:
1519
+ return json.dumps({
1520
+ "type": "tool_response",
1521
+ "tool_name": "understand_video",
1522
+ "error": "Video data too large for inline processing (>20MB). Please use file upload or URL instead."
1523
+ })
1524
+ # Use inline data for small videos with optional metadata
1525
+ if video_metadata:
1526
+ video_part = types.Part(
1527
+ inline_data=types.Blob(
1528
+ data=video_data,
1529
+ mime_type='video/mp4' # Default to mp4, could be detected from file extension
1530
+ ),
1531
+ video_metadata=types.VideoMetadata(**video_metadata)
1532
+ )
1533
+ else:
1534
+ video_part = types.Part(
1535
+ inline_data=types.Blob(
1536
+ data=video_data,
1537
+ mime_type='video/mp4' # Default to mp4, could be detected from file extension
1538
+ )
1539
+ )
1540
+ except Exception as decode_error:
1541
+ return json.dumps({
1542
+ "type": "tool_response",
1543
+ "tool_name": "understand_video",
1544
+ "error": f"Error processing video data: {str(decode_error)}. Expected base64 encoded video data (<20MB), valid file path, YouTube URL, or direct video URL."
1545
+ })
1546
+ # Don't embed system_prompt in user prompt - use API parameter instead
1547
+ enhanced_prompt = prompt
1548
+ # Generate content using the video
1549
+ contents = types.Content(
1550
+ parts=[
1551
+ video_part,
1552
+ types.Part(text=enhanced_prompt)
1553
+ ]
1554
  )
1555
+ # Create config with system_instruction if provided
1556
+ config = None
1557
+ if system_prompt:
1558
+ config = types.GenerateContentConfig(
1559
+ system_instruction=system_prompt
1560
+ )
1561
+ try:
1562
+ response = client.models.generate_content(
1563
+ model="gemini-2.5-flash",
1564
+ contents=contents,
1565
+ config=config
1566
+ )
1567
+ return json.dumps({
1568
+ "type": "tool_response",
1569
+ "tool_name": "understand_video",
1570
+ "result": response.text
1571
+ })
1572
+ except Exception as e:
1573
+ return json.dumps({
1574
+ "type": "tool_response",
1575
+ "tool_name": "understand_video",
1576
+ "error": f"Error in video understanding request: {str(e)}"
1577
+ })
1578
  except Exception as e:
1579
  return json.dumps({
1580
  "type": "tool_response",
 
1583
  })
1584
 
1585
  @tool
1586
+ def understand_audio(file_reference: str, prompt: str, system_prompt: str = None, agent=None,
1587
+ start_time: str = None, end_time: str = None) -> str:
1588
  """
1589
  Analyze an audio file using Google Gemini's audio understanding capabilities.
1590
  This tool can transcribe audio, understand spoken content, and answer questions
1591
+ about the audio content. Supports timestamp references in prompts (MM:SS format).
 
1592
  The audio file is uploaded to Gemini and then analyzed with the provided prompt.
1593
  Args:
1594
  file_reference (str): Original filename from user upload OR URL to download OR base64 encoded audio data.
1595
  prompt (str): A question or request regarding the audio content.
1596
+ system_prompt (str, optional): System instruction.
1597
  agent: Agent instance for file resolution (injected automatically)
1598
+ start_time (str, optional): Start time reference in MM:SS format (e.g., "02:30")
1599
+ end_time (str, optional): End time reference in MM:SS format (e.g., "03:29")
1600
  Returns:
1601
  str: Analysis of the audio content based on the prompt, or error message.
1602
  """
 
1624
  "error": f"Error uploading audio file to Gemini: {str(upload_error)}"
1625
  })
1626
  else:
1627
+ # Check if it looks like a URL that failed to download
1628
+ if file_reference.startswith(('http://', 'https://', 'ftp://')):
1629
+ return json.dumps({
1630
+ "type": "tool_response",
1631
+ "tool_name": "understand_audio",
1632
+ "error": f"Failed to download audio from URL: {file_reference}. Please check the URL is accessible and try again."
1633
+ })
1634
+
1635
  # Try base64 fallback
1636
  try:
1637
  # Decode base64 and create temporary file
 
1651
  "tool_name": "understand_audio",
1652
  "error": f"Error processing audio data: {str(decode_error)}. Expected base64 encoded audio data, valid file path, or URL."
1653
  })
1654
+ # Create enhanced prompt with timestamp references if provided
1655
+ timestamp_instruction = ""
1656
+ if start_time and end_time:
1657
+ timestamp_instruction = f" Focus on the audio segment from {start_time} to {end_time}."
1658
+ elif start_time:
1659
+ timestamp_instruction = f" Focus on the audio segment starting from {start_time}."
1660
+ elif end_time:
1661
+ timestamp_instruction = f" Focus on the audio segment up to {end_time}."
1662
+ # Build prompt with timestamp instructions only
1663
+ enhanced_prompt = f"{prompt}\n\n{timestamp_instruction}"
1664
  contents = [enhanced_prompt, mp3_file]
1665
+ # Create config with system_instruction if provided
1666
+ config = None
1667
+ if system_prompt:
1668
+ config = types.GenerateContentConfig(
1669
+ system_instruction=system_prompt
1670
+ )
1671
  try:
1672
  response = client.models.generate_content(
1673
  model="gemini-2.5-flash",
1674
+ contents=contents,
1675
+ config=config
1676
  )
1677
  return json.dumps({
1678
  "type": "tool_response",