| import os |
| import re |
| import time |
| import json |
| import pandas as pd |
| from pathlib import Path |
| from typing import List, Dict, Optional, Union |
| from tabulate import tabulate |
| import whisper |
| import numpy as np |
|
|
| from langchain_community.tools import DuckDuckGoSearchResults, WikipediaQueryRun |
| from langchain_community.utilities import WikipediaAPIWrapper |
| import yt_dlp |
|
|
| |
| |
| |
| try: |
| print("Pre-loading Whisper model globally...") |
| WHISPER_MODEL = whisper.load_model("tiny") |
| except Exception as e: |
| print(f"Warning: Failed to load Whisper globally: {e}") |
| WHISPER_MODEL = None |
|
|
| |
| class EnhancedSearchTool: |
| """Enhanced web search with intelligent query processing and result filtering""" |
| |
| def __init__(self, llm=None, max_results: int = 10): |
| self.base_tool = DuckDuckGoSearchResults(num_results=max_results) |
| self.max_results = max_results |
| self.llm = llm |
| |
| def _extract_key_terms(self, question: str) -> List[str]: |
| """Extract key search terms from the question using LLM or regex fallback""" |
| if self.llm: |
| try: |
| extract_prompt = f""" |
| Extract the most important search terms from this question for web search: |
| Question: {question} |
| |
| Return ONLY a comma-separated list of key terms, no explanations. |
| Focus on: proper nouns, specific concepts, technical terms, dates, numbers. |
| Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'. |
| |
| Example: "What is the population of Tokyo in 2025?" -> "Tokyo population 2025" |
| """ |
| response = self.llm.invoke(extract_prompt).content.strip() |
| return [term.strip() for term in response.split(',')] |
| except Exception as e: |
| print(f"LLM keyword extraction failed, using fallback: {e}") |
| |
| return self._simple_keyword_extraction(question) |
| |
| def _simple_keyword_extraction(self, question: str) -> List[str]: |
| """Fallback keyword extraction using regex""" |
| stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'} |
| words = re.findall(r'\b[A-Za-z]+\b', question.lower()) |
| return [word for word in words if word not in stop_words and len(word) > 2] |
| |
| def _generate_search_queries(self, question: str) -> List[str]: |
| """Generate multiple search queries for comprehensive results targeting recent events""" |
| key_terms = self._extract_key_terms(question) |
| queries = [] |
| |
| cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() |
| queries.append(cleaned_question) |
| |
| if key_terms: |
| queries.append(' '.join(key_terms[:5])) |
| |
| if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']): |
| queries.append(f"{' '.join(key_terms[:3])} 2025 2026") |
| |
| if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']): |
| queries.append(f"{' '.join(key_terms[:3])} statistics data") |
| |
| if any(word in question.lower() for word in ['definition', 'what is', 'meaning']): |
| queries.append(f"{' '.join(key_terms[:2])} definition meaning") |
| |
| return list(dict.fromkeys(queries)) |
| |
| def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]: |
| """Filter and rank search results based on relevance and timelines""" |
| if not results: |
| return results |
| |
| key_terms = self._extract_key_terms(question) |
| key_terms_lower = [term.lower() for term in key_terms] |
| |
| scored_results = [] |
| for result in results: |
| score = 0 |
| text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower() |
| |
| for term in key_terms_lower: |
| if term in text_content: |
| score += text_content.count(term) |
| |
| if any(year in text_content for year in ['2025', '2026']): |
| score += 3 |
| elif '2024' in text_content: |
| score += 1 |
| |
| if len(result.get('snippet', '')) < 50: |
| score -= 1 |
| |
| scored_results.append((score, result)) |
| |
| scored_results.sort(key=lambda x: x[0], reverse=True) |
| return [result for score, result in scored_results[:self.max_results]] |
| |
| def run(self, question: str) -> str: |
| """Enhanced search execution with multiple queries and custom string pattern parsing""" |
| try: |
| search_queries = self._generate_search_queries(question) |
| all_results = [] |
| |
| for query in search_queries[:3]: |
| try: |
| results = self.base_tool.run(query) |
| parsed_results = [] |
| |
| if isinstance(results, str): |
| if results.startswith('['): |
| try: |
| parsed_results = json.loads(results) |
| except: |
| pass |
| |
| |
| if not parsed_results: |
| items = re.findall(r'\[snippet:\s*(.*?),\s*title:\s*(.*?),\s*link:\s*(.*?)\]', results, re.DOTALL) |
| for snippet, title, link in items: |
| parsed_results.append({'snippet': snippet, 'title': title, 'link': link}) |
| |
| if not parsed_results: |
| parsed_results = [{'snippet': results, 'title': 'Search Result'}] |
| |
| if isinstance(parsed_results, list): |
| all_results.extend(parsed_results) |
| |
| time.sleep(0.5) |
| except Exception as e: |
| print(f"Search query failed: {query} - {e}") |
| continue |
| |
| if not all_results: |
| return "No search results found." |
| |
| filtered_results = self._filter_and_rank_results(all_results, question) |
| |
| formatted_results = [] |
| for i, result in enumerate(filtered_results[:5], 1): |
| title = result.get('title', 'No title') |
| snippet = result.get('snippet', 'No description') |
| link = result.get('link', 'N/A') |
| formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n") |
| |
| return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results) |
| |
| except Exception as e: |
| return f"Enhanced search error: {str(e)}" |
|
|
| |
| class EnhancedWikipediaTool: |
| """Enhanced Wikipedia search with intelligent query processing and content extraction""" |
| |
| def __init__(self, llm=None): |
| self.base_wrapper = WikipediaAPIWrapper( |
| top_k_results=3, |
| doc_content_chars_max=3000, |
| load_all_available_meta=True |
| ) |
| self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper) |
| self.llm = llm |
| |
| def _extract_entities(self, question: str) -> List[str]: |
| """Extract named entities for Wikipedia search""" |
| if self.llm: |
| try: |
| entity_prompt = f""" |
| Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search: |
| Question: {question} |
| |
| Return ONLY a comma-separated list of the most important entities. |
| Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts. |
| |
| Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity" |
| """ |
| response = self.llm.invoke(entity_prompt).content.strip() |
| entities = [entity.strip() for entity in response.split(',')] |
| return [e for e in entities if len(e) > 2] |
| except Exception as e: |
| print(f"LLM entity extraction failed: {e}") |
| |
| return self._extract_capitalized_terms(question) |
| |
| def _extract_capitalized_terms(self, question: str) -> List[str]: |
| """Fallback: extract capitalized terms as potential entities""" |
| capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question) |
| quoted_terms = re.findall(r'"([^"]+)"', question) |
| quoted_terms.extend(re.findall(r"'([^']+)'", question)) |
| return capitalized_words + quoted_terms |
| |
| def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]: |
| results = {} |
| for entity in entities[:3]: |
| try: |
| result = self.base_tool.run(entity) |
| if result and "Page:" in result and len(result) > 100: |
| results[entity] = result |
| time.sleep(0.5) |
| except Exception as e: |
| print(f"Wikipedia search failed for '{entity}': {e}") |
| return results |
| |
| def _extract_relevant_sections(self, content: str, question: str) -> str: |
| if not content or len(content) < 200: |
| return content |
| |
| sections = re.split(r'\n\s*\n', content) |
| key_terms = self._extract_entities(question) |
| key_terms_lower = [term.lower() for term in key_terms] |
| |
| scored_sections = [] |
| for section in sections: |
| if len(section.strip()) < 50: |
| continue |
| |
| score = 0 |
| section_lower = section.lower() |
| |
| for term in key_terms_lower: |
| score += section_lower.count(term) |
| |
| if re.search(r'\b(20)\d{2}\b', section): |
| score += 1 |
| if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section): |
| score += 1 |
| |
| scored_sections.append((score, section)) |
| |
| scored_sections.sort(key=lambda x: x[0], reverse=True) |
| top_sections = [section for score, section in scored_sections[:3] if score > 0] |
| |
| return '\n\n'.join(top_sections) if top_sections else '\n\n'.join(sections[:2]) |
| |
| def run(self, question: str) -> str: |
| try: |
| entities = self._extract_entities(question) |
| if not entities: |
| cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() |
| result = self.base_tool.run(cleaned_question) |
| return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found." |
| |
| search_results = self._search_multiple_terms(entities) |
| if not search_results: |
| return "No relevant Wikipedia articles found." |
| |
| formatted_results = [] |
| for entity, content in search_results.items(): |
| relevant_content = self._extract_relevant_sections(content, question) |
| if relevant_content: |
| formatted_results.append(f"=== {entity} ===\n{relevant_content}") |
| |
| return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results) if formatted_results else "No relevant info found." |
| except Exception as e: |
| return f"Wikipedia tool error: {str(e)}" |
|
|
| |
| |
| |
| def excel_to_markdown(excel_path: str, sheet_name: Optional[str] = None) -> str: |
| """Reads an Excel or CSV file and creates a high-context markdown view without raw line data cuts.""" |
| try: |
| file_path = Path(excel_path).expanduser().resolve() |
| if not file_path.is_file(): |
| return f"Error: Excel file not found at {file_path}" |
|
|
| |
| sheet = int(sheet_name) if sheet_name and str(sheet_name).isdigit() else sheet_name or 0 |
| |
| |
| if file_path.suffix.lower() == '.csv': |
| df = pd.read_csv(file_path) |
| else: |
| df = pd.read_excel(file_path, sheet_name=sheet) |
| |
| metadata = f"EXCEL FILE ANALYSIS:\nFile: {file_path.name}\nDimensions: {len(df)} rows × {len(df.columns)} columns\n" |
| metadata += f"Columns: {', '.join(df.columns.astype(str).tolist())}\n" |
| |
| numeric_cols = df.select_dtypes(include=['number']).columns |
| if len(numeric_cols) > 0: |
| for col in numeric_cols[:3]: |
| metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}\n" |
| |
| |
| if len(df) <= 300: |
| metadata += "\nCOMPLETE DATASET TABLE:\n" |
| data_render = df.to_markdown(index=False) if hasattr(df, "to_markdown") else tabulate(df, headers="keys", tablefmt="github", showindex=False) |
| return metadata + data_render |
| else: |
| metadata += f"\nLARGE DATASET (First 50 structural items processed):\n" |
| trimmed_df = df.head(50) |
| data_render = trimmed_df.to_markdown(index=False) if hasattr(trimmed_df, "to_markdown") else tabulate(trimmed_df, headers="keys", tablefmt="github", showindex=False) |
| return metadata + data_render + f"\n\n... Truncation info: Sheet contains {len(df) - 50} additional rows below this view." |
| |
| except Exception as e: |
| return f"Error reading Excel file: {str(e)}" |
|
|
| def image_file_info(image_path: str, question: str) -> str: |
| try: |
| from google import genai |
| from google.genai.types import Part |
|
|
| client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) |
| with open(image_path, "rb") as f: |
| img_bytes = f.read() |
|
|
| response = client.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=[ |
| question, |
| Part.from_bytes(data=img_bytes, mime_type="image/jpeg") |
| ], |
| ) |
| return response.text |
| except Exception as e: |
| return f"Error during image analysis: {e}" |
|
|
| def audio_file_info(audio_path: str) -> str: |
| """Uses globally warmed Whisper model instance directly to skip disk read steps.""" |
| if not WHISPER_MODEL: |
| return "Audio analysis engine failure: Whisper is uninstantiated." |
| try: |
| result = WHISPER_MODEL.transcribe(str(audio_path), fp16=False) |
| return result.get('text', 'Audio tracking complete. No language metadata detected.') |
| except Exception as e: |
| return f"Error transcribing audio: {str(e)}" |
|
|
| |
| def code_file_read(file_path: str) -> str: |
| try: |
| resolved_path = Path(file_path) |
| with open(resolved_path, "r", encoding="utf-8") as f: |
| content = f.read() |
| info = f"CODE FILE ANALYSIS:\nFile: {resolved_path.name}\nLines: {len(content.splitlines())}\n" |
| |
| if resolved_path.suffix == '.py': |
| func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE)) |
| class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE)) |
| info += f"Functions: {func_count}, Classes: {class_count}\n" |
| |
| return f"{info}\nCODE CONTENT:\n{content}" |
| except Exception as e: |
| return f"Error reading code file: {e}" |
|
|
| |
| def extract_youtube_info(url_or_text: str) -> str: |
| """Extracts YouTube URL from question, downloads audio, and forwards to transcription engine.""" |
| pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))" |
| match = re.search(pattern, url_or_text) |
| youtube_url = match.group(1) if match else url_or_text |
| |
| match_id = re.search(r"(?:v=|\/)([a-zA-Z0-9_-]{11})", youtube_url) |
| video_id = match_id.group(1) if match_id else "downloaded_video" |
| |
| file_output = f"{video_id}.mp4" |
| |
| ydl_opts = { |
| 'format': 'best[ext=mp4]/best', |
| 'outtmpl': file_output, |
| 'quiet': True, |
| 'no_warnings': True, |
| } |
|
|
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([youtube_url]) |
| |
| if os.path.exists(file_output): |
| transcription = audio_file_info(file_output) |
| |
| try: |
| os.remove(file_output) |
| except: |
| pass |
| return transcription |
| |
| return "Error: Download completed but file path could not be resolved." |
| except Exception as e: |
| return f"Error processing YouTube video: {e}" |