agentzero07 commited on
Commit
1e40a5f
·
verified ·
1 Parent(s): 436faa4

Delete tools.py

Browse files
Files changed (1) hide show
  1. tools.py +0 -434
tools.py DELETED
@@ -1,434 +0,0 @@
1
- from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
2
- from langchain.utilities import WikipediaAPIWrapper
3
- from PIL import Image
4
- import re
5
- import time
6
- import json
7
- import pandas as pd
8
- from pathlib import Path
9
- from typing import List, Dict, Optional, Union
10
- from tabulate import tabulate
11
- import whisper
12
-
13
- import numpy as np
14
- import os
15
-
16
- # ----------- Enhanced Search Functionality -----------
17
- class EnhancedSearchTool:
18
- """Enhanced web search with intelligent query processing and result filtering"""
19
-
20
- def __init__(self, max_results: int = 10):
21
- self.base_tool = DuckDuckGoSearchResults(num_results=max_results)
22
- self.max_results = max_results
23
-
24
- def _extract_key_terms(self, question: str) -> List[str]:
25
- """Extract key search terms from the question using LLM"""
26
- try:
27
- extract_prompt = f"""
28
- Extract the most important search terms from this question for web search:
29
- Question: {question}
30
-
31
- Return ONLY a comma-separated list of key terms, no explanations.
32
- Focus on: proper nouns, specific concepts, technical terms, dates, numbers.
33
- Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'.
34
-
35
- Example: "What is the population of Tokyo in 2023?" -> "Tokyo population 2023"
36
- """
37
-
38
- response = llm.invoke(extract_prompt).content.strip()
39
- return [term.strip() for term in response.split(',')]
40
- except Exception:
41
- # Fallback to simple keyword extraction
42
- return self._simple_keyword_extraction(question)
43
-
44
- def _simple_keyword_extraction(self, question: str) -> List[str]:
45
- """Fallback keyword extraction using regex"""
46
- # Remove common question words
47
- stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'}
48
- words = re.findall(r'\b[A-Za-z]+\b', question.lower())
49
- return [word for word in words if word not in stop_words and len(word) > 2]
50
-
51
- def _generate_search_queries(self, question: str) -> List[str]:
52
- """Generate multiple search queries for comprehensive results"""
53
- key_terms = self._extract_key_terms(question)
54
-
55
- queries = []
56
-
57
- # Original question (cleaned)
58
- cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
59
- queries.append(cleaned_question)
60
-
61
- # Key terms combined
62
- if key_terms:
63
- queries.append(' '.join(key_terms[:5])) # Top 5 terms
64
-
65
- # Specific query patterns based on question type
66
- if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']):
67
- queries.append(f"{' '.join(key_terms[:3])} 2024 2025")
68
-
69
- if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']):
70
- queries.append(f"{' '.join(key_terms[:3])} statistics data")
71
-
72
- if any(word in question.lower() for word in ['definition', 'what is', 'meaning']):
73
- queries.append(f"{' '.join(key_terms[:2])} definition meaning")
74
-
75
- return list(dict.fromkeys(queries)) # Remove duplicates while preserving order
76
-
77
- def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]:
78
- """Filter and rank search results based on relevance"""
79
- if not results:
80
- return results
81
-
82
- key_terms = self._extract_key_terms(question)
83
- key_terms_lower = [term.lower() for term in key_terms]
84
-
85
- scored_results = []
86
- for result in results:
87
- score = 0
88
- text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower()
89
-
90
- # Score based on key term matches
91
- for term in key_terms_lower:
92
- if term in text_content:
93
- score += text_content.count(term)
94
-
95
- # Bonus for recent dates
96
- if any(year in text_content for year in ['2024', '2025', '2023']):
97
- score += 2
98
-
99
- # Penalty for very short snippets
100
- if len(result.get('snippet', '')) < 50:
101
- score -= 1
102
-
103
- scored_results.append((score, result))
104
-
105
- # Sort by score and return top results
106
- scored_results.sort(key=lambda x: x[0], reverse=True)
107
- return [result for score, result in scored_results[:self.max_results]]
108
-
109
- def run(self, question: str) -> str:
110
- """Enhanced search execution with multiple queries and result filtering"""
111
- try:
112
- search_queries = self._generate_search_queries(question)
113
- all_results = []
114
-
115
- for query in search_queries[:3]: # Limit to 3 queries to avoid rate limits
116
- try:
117
- results = self.base_tool.run(query)
118
- if isinstance(results, str):
119
- # Parse string results if needed
120
- try:
121
- results = json.loads(results) if results.startswith('[') else [{'snippet': results, 'title': 'Search Result'}]
122
- except:
123
- results = [{'snippet': results, 'title': 'Search Result'}]
124
-
125
- if isinstance(results, list):
126
- all_results.extend(results)
127
-
128
- time.sleep(0.5) # Rate limiting
129
- except Exception as e:
130
- print(f"Search query failed: {query} - {e}")
131
- continue
132
-
133
- if not all_results:
134
- return "No search results found."
135
-
136
- # Filter and rank results
137
- filtered_results = self._filter_and_rank_results(all_results, question)
138
-
139
- # Format results
140
- formatted_results = []
141
- for i, result in enumerate(filtered_results[:5], 1):
142
- title = result.get('title', 'No title')
143
- snippet = result.get('snippet', 'No description')
144
- link = result.get('link', '')
145
-
146
- formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n")
147
-
148
- return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results)
149
-
150
- except Exception as e:
151
- return f"Enhanced search error: {str(e)}"
152
-
153
- # ----------- Enhanced Wikipedia Tool -----------
154
- class EnhancedWikipediaTool:
155
- """Enhanced Wikipedia search with intelligent query processing and content extraction"""
156
-
157
- def __init__(self):
158
- self.base_wrapper = WikipediaAPIWrapper(
159
- top_k_results=3,
160
- doc_content_chars_max=3000,
161
- load_all_available_meta=True
162
- )
163
- self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper)
164
-
165
- def _extract_entities(self, question: str) -> List[str]:
166
- """Extract named entities for Wikipedia search"""
167
- try:
168
- entity_prompt = f"""
169
- Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search:
170
- Question: {question}
171
-
172
- Return ONLY a comma-separated list of the most important entities.
173
- Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts.
174
-
175
- Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity, relativity"
176
- """
177
-
178
- response = llm.invoke(entity_prompt).content.strip()
179
- entities = [entity.strip() for entity in response.split(',')]
180
- return [e for e in entities if len(e) > 2]
181
- except Exception:
182
- # Fallback: extract capitalized words and phrases
183
- return self._extract_capitalized_terms(question)
184
-
185
- def _extract_capitalized_terms(self, question: str) -> List[str]:
186
- """Fallback: extract capitalized terms as potential entities"""
187
- # Find capitalized words and phrases
188
- capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question)
189
- # Also look for quoted terms
190
- quoted_terms = re.findall(r'"([^"]+)"', question)
191
- quoted_terms.extend(re.findall(r"'([^']+)'", question))
192
-
193
- return capitalized_words + quoted_terms
194
-
195
- def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]:
196
- """Search Wikipedia for multiple entities and return best results"""
197
- results = {}
198
-
199
- for entity in entities[:3]: # Limit to avoid too many API calls
200
- try:
201
- result = self.base_tool.run(entity)
202
- if result and "Page:" in result and len(result) > 100:
203
- results[entity] = result
204
- time.sleep(0.5) # Rate limiting
205
- except Exception as e:
206
- print(f"Wikipedia search failed for '{entity}': {e}")
207
- continue
208
-
209
- return results
210
-
211
- def _extract_relevant_sections(self, content: str, question: str) -> str:
212
- """Extract the most relevant sections from Wikipedia content"""
213
- if not content or len(content) < 200:
214
- return content
215
-
216
- # Split content into sections (usually separated by double newlines)
217
- sections = re.split(r'\n\s*\n', content)
218
-
219
- # Score sections based on relevance to question
220
- key_terms = self._extract_entities(question)
221
- key_terms_lower = [term.lower() for term in key_terms]
222
-
223
- scored_sections = []
224
- for section in sections:
225
- if len(section.strip()) < 50:
226
- continue
227
-
228
- score = 0
229
- section_lower = section.lower()
230
-
231
- # Score based on key term matches
232
- for term in key_terms_lower:
233
- score += section_lower.count(term)
234
-
235
- # Bonus for sections with dates, numbers, or specific facts
236
- if re.search(r'\b(19|20)\d{2}\b', section): # Years
237
- score += 1
238
- if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section):
239
- score += 1
240
-
241
- scored_sections.append((score, section))
242
-
243
- # Sort by relevance and take top sections
244
- scored_sections.sort(key=lambda x: x[0], reverse=True)
245
- top_sections = [section for score, section in scored_sections[:3] if score > 0]
246
-
247
- if not top_sections:
248
- # If no highly relevant sections, take first few sections
249
- top_sections = sections[:2]
250
-
251
- return '\n\n'.join(top_sections)
252
-
253
- def run(self, question: str) -> str:
254
- """Enhanced Wikipedia search with entity extraction and content filtering"""
255
- try:
256
- entities = self._extract_entities(question)
257
-
258
- if not entities:
259
- # Fallback to direct search with cleaned question
260
- cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
261
- try:
262
- result = self.base_tool.run(cleaned_question)
263
- return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found."
264
- except Exception as e:
265
- return f"Wikipedia search error: {str(e)}"
266
-
267
- # Search for multiple entities
268
- search_results = self._search_multiple_terms(entities)
269
-
270
- if not search_results:
271
- return "No relevant Wikipedia articles found."
272
-
273
- # Combine and format results
274
- formatted_results = []
275
- for entity, content in search_results.items():
276
- relevant_content = self._extract_relevant_sections(content, question)
277
- if relevant_content:
278
- formatted_results.append(f"=== {entity} ===\n{relevant_content}")
279
-
280
- if not formatted_results:
281
- return "No relevant information found in Wikipedia articles."
282
-
283
- return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results)
284
-
285
- except Exception as e:
286
- return f"Enhanced Wikipedia error: {str(e)}"
287
-
288
- # ----------- Enhanced File Processing Tools -----------
289
- def excel_to_markdown(inputs: dict) -> str:
290
- """Enhanced Excel tool with better error handling and data analysis"""
291
- try:
292
- excel_path = inputs["excel_path"]
293
- sheet_name = inputs.get("sheet_name", None)
294
- file_path = Path(excel_path).expanduser().resolve()
295
- if not file_path.is_file():
296
- return f"Error: Excel file not found at {file_path}"
297
-
298
- sheet: Union[str, int] = (
299
- int(sheet_name) if sheet_name and sheet_name.isdigit() else sheet_name or 0
300
- )
301
- df = pd.read_excel(file_path, sheet_name=sheet)
302
-
303
- # Enhanced metadata
304
- metadata = f"EXCEL FILE ANALYSIS:\n"
305
- metadata += f"File: {file_path.name}\n"
306
- metadata += f"Dimensions: {len(df)} rows × {len(df.columns)} columns\n"
307
- metadata += f"Columns: {', '.join(df.columns.tolist())}\n"
308
-
309
- # Data type information
310
- metadata += f"Data types: {dict(df.dtypes)}\n"
311
-
312
- # Basic statistics for numeric columns
313
- numeric_cols = df.select_dtypes(include=['number']).columns
314
- if len(numeric_cols) > 0:
315
- metadata += f"Numeric columns: {list(numeric_cols)}\n"
316
- for col in numeric_cols[:3]: # Limit to first 3 numeric columns
317
- metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}\n"
318
-
319
- metadata += "\nSAMPLE DATA (first 10 rows):\n"
320
-
321
- if hasattr(df, "to_markdown"):
322
- sample_data = df.head(10).to_markdown(index=False)
323
- else:
324
- sample_data = tabulate(df.head(10), headers="keys", tablefmt="github", showindex=False)
325
-
326
- return metadata + sample_data + f"\n\n(Showing first 10 rows of {len(df)} total rows)"
327
-
328
- except Exception as e:
329
- return f"Error reading Excel file: {str(e)}"
330
-
331
- def image_file_info(image_path: str, question: str) -> str:
332
- """Enhanced image file analysis using Gemini API"""
333
- try:
334
- from google import genai
335
- from google.genai.types import Part
336
-
337
- client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
338
-
339
- # Read content from a local file
340
- with open(image_path, "rb") as f:
341
- img_bytes = f.read()
342
-
343
- response = client.models.generate_content(
344
- model="gemini-2.5-flash-preview-05-20",
345
- contents=[
346
- question,
347
- Part.from_bytes(data=img_bytes, mime_type="image/jpeg")
348
- ],
349
- )
350
- return response.text
351
-
352
- except Exception as e:
353
- return f"Error during image analysis: {e}"
354
-
355
- def audio_file_info(audio_path: str) -> str:
356
- """Returns only the transcription of an audio file."""
357
- try:
358
- model = whisper.load_model("tiny") # Fast + accurate balance
359
- result = model.transcribe(audio_path, fp16=False)
360
- return result['text']
361
- except Exception as e:
362
- return f"Error transcribing audio: {str(e)}"
363
-
364
- def code_file_read(code_path: str) -> str:
365
- """Enhanced code file analysis"""
366
- try:
367
- with open(code_path, "r", encoding="utf-8") as f:
368
- content = f.read()
369
-
370
- file_path = Path(code_path)
371
-
372
- info = f"CODE FILE ANALYSIS:\n"
373
- info += f"File: {file_path.name}\n"
374
- info += f"Extension: {file_path.suffix}\n"
375
- info += f"Size: {len(content)} characters, {len(content.splitlines())} lines\n"
376
-
377
- # Language-specific analysis
378
- if file_path.suffix == '.py':
379
- # Python-specific analysis
380
- import_lines = [line for line in content.splitlines() if line.strip().startswith(('import ', 'from '))]
381
- if import_lines:
382
- info += f"Imports ({len(import_lines)}): {', '.join(import_lines[:5])}\n"
383
-
384
- # Count functions and classes
385
- func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE))
386
- class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE))
387
- info += f"Functions: {func_count}, Classes: {class_count}\n"
388
-
389
- info += f"\nCODE CONTENT:\n{content}"
390
- return info
391
-
392
- except Exception as e:
393
- return f"Error reading code file: {e}"
394
-
395
-
396
- import yt_dlp
397
- from pathlib import Path
398
-
399
- def extract_youtube_info(question: str) -> str:
400
- """
401
- Download a YouTube video or audio using yt-dlp without merging.
402
-
403
- Parameters:
404
- - url: str — YouTube URL
405
- - audio_only: bool — if True, downloads audio only; else best single video+audio stream
406
-
407
- Returns:
408
- - str: path to downloaded file or error message
409
- """
410
- pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))"
411
- match = re.search(pattern, question)
412
- youtube_url = match.group(1) if match else None
413
- print(f"Extracting YouTube URL: {youtube_url}")
414
-
415
- match = re.search(r"(?:v=|\/)([a-zA-Z0-9_-]{11})", youtube_url)
416
- video_id = match.group(1) if match else "dummy_id"
417
- file_path = Path(video_id)
418
-
419
- output_dir = Path(file_path).parent
420
- output_dir.mkdir(parents=True, exist_ok=True)
421
-
422
- ydl_opts = {
423
- 'format': 'best[ext=mp4]/best', # best mp4 combined stream or fallback to best available
424
- 'outtmpl': str(file_path),
425
- 'quiet': True,
426
- 'no_warnings': True,
427
- }
428
-
429
- try:
430
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
431
- ydl.download([youtube_url])
432
- return audio_file_info(str(file_path))
433
- except Exception as e:
434
- return f"Error: {e}"