import os import re import time import json import pandas as pd from pathlib import Path from typing import List, Dict, Optional, Union from tabulate import tabulate import whisper import numpy as np from langchain_community.tools import DuckDuckGoSearchResults, WikipediaQueryRun from langchain_community.utilities import WikipediaAPIWrapper import yt_dlp # ----------- Global Model Warmup ----------- # Loading the Whisper model globally ensures it is only read from disk once. # This prevents out-of-memory errors and timeout flags during consecutive runs. try: print("Pre-loading Whisper model globally...") WHISPER_MODEL = whisper.load_model("tiny") except Exception as e: print(f"Warning: Failed to load Whisper globally: {e}") WHISPER_MODEL = None # ----------- Enhanced Search Functionality ----------- class EnhancedSearchTool: """Enhanced web search with intelligent query processing and result filtering""" def __init__(self, llm=None, max_results: int = 10): self.base_tool = DuckDuckGoSearchResults(num_results=max_results) self.max_results = max_results self.llm = llm def _extract_key_terms(self, question: str) -> List[str]: """Extract key search terms from the question using LLM or regex fallback""" if self.llm: try: extract_prompt = f""" Extract the most important search terms from this question for web search: Question: {question} Return ONLY a comma-separated list of key terms, no explanations. Focus on: proper nouns, specific concepts, technical terms, dates, numbers. Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'. Example: "What is the population of Tokyo in 2025?" -> "Tokyo population 2025" """ response = self.llm.invoke(extract_prompt).content.strip() return [term.strip() for term in response.split(',')] except Exception as e: print(f"LLM keyword extraction failed, using fallback: {e}") return self._simple_keyword_extraction(question) def _simple_keyword_extraction(self, question: str) -> List[str]: """Fallback keyword extraction using regex""" stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'} words = re.findall(r'\b[A-Za-z]+\b', question.lower()) return [word for word in words if word not in stop_words and len(word) > 2] def _generate_search_queries(self, question: str) -> List[str]: """Generate multiple search queries for comprehensive results targeting recent events""" key_terms = self._extract_key_terms(question) queries = [] cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() queries.append(cleaned_question) if key_terms: queries.append(' '.join(key_terms[:5])) if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']): queries.append(f"{' '.join(key_terms[:3])} 2025 2026") if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']): queries.append(f"{' '.join(key_terms[:3])} statistics data") if any(word in question.lower() for word in ['definition', 'what is', 'meaning']): queries.append(f"{' '.join(key_terms[:2])} definition meaning") return list(dict.fromkeys(queries)) def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]: """Filter and rank search results based on relevance and timelines""" if not results: return results key_terms = self._extract_key_terms(question) key_terms_lower = [term.lower() for term in key_terms] scored_results = [] for result in results: score = 0 text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower() for term in key_terms_lower: if term in text_content: score += text_content.count(term) if any(year in text_content for year in ['2025', '2026']): score += 3 elif '2024' in text_content: score += 1 if len(result.get('snippet', '')) < 50: score -= 1 scored_results.append((score, result)) scored_results.sort(key=lambda x: x[0], reverse=True) return [result for score, result in scored_results[:self.max_results]] def run(self, question: str) -> str: """Enhanced search execution with multiple queries and custom string pattern parsing""" try: search_queries = self._generate_search_queries(question) all_results = [] for query in search_queries[:3]: try: results = self.base_tool.run(query) parsed_results = [] if isinstance(results, str): if results.startswith('['): try: parsed_results = json.loads(results) except: pass # Fallback parsing for LangChain's default text block list layout if not parsed_results: items = re.findall(r'\[snippet:\s*(.*?),\s*title:\s*(.*?),\s*link:\s*(.*?)\]', results, re.DOTALL) for snippet, title, link in items: parsed_results.append({'snippet': snippet, 'title': title, 'link': link}) if not parsed_results: parsed_results = [{'snippet': results, 'title': 'Search Result'}] if isinstance(parsed_results, list): all_results.extend(parsed_results) time.sleep(0.5) except Exception as e: print(f"Search query failed: {query} - {e}") continue if not all_results: return "No search results found." filtered_results = self._filter_and_rank_results(all_results, question) formatted_results = [] for i, result in enumerate(filtered_results[:5], 1): title = result.get('title', 'No title') snippet = result.get('snippet', 'No description') link = result.get('link', 'N/A') formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n") return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results) except Exception as e: return f"Enhanced search error: {str(e)}" # ----------- Enhanced Wikipedia Tool ----------- class EnhancedWikipediaTool: """Enhanced Wikipedia search with intelligent query processing and content extraction""" def __init__(self, llm=None): self.base_wrapper = WikipediaAPIWrapper( top_k_results=3, doc_content_chars_max=3000, load_all_available_meta=True ) self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper) self.llm = llm def _extract_entities(self, question: str) -> List[str]: """Extract named entities for Wikipedia search""" if self.llm: try: entity_prompt = f""" Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search: Question: {question} Return ONLY a comma-separated list of the most important entities. Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts. Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity" """ response = self.llm.invoke(entity_prompt).content.strip() entities = [entity.strip() for entity in response.split(',')] return [e for e in entities if len(e) > 2] except Exception as e: print(f"LLM entity extraction failed: {e}") return self._extract_capitalized_terms(question) def _extract_capitalized_terms(self, question: str) -> List[str]: """Fallback: extract capitalized terms as potential entities""" capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question) quoted_terms = re.findall(r'"([^"]+)"', question) quoted_terms.extend(re.findall(r"'([^']+)'", question)) return capitalized_words + quoted_terms def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]: results = {} for entity in entities[:3]: try: result = self.base_tool.run(entity) if result and "Page:" in result and len(result) > 100: results[entity] = result time.sleep(0.5) except Exception as e: print(f"Wikipedia search failed for '{entity}': {e}") return results def _extract_relevant_sections(self, content: str, question: str) -> str: if not content or len(content) < 200: return content sections = re.split(r'\n\s*\n', content) key_terms = self._extract_entities(question) key_terms_lower = [term.lower() for term in key_terms] scored_sections = [] for section in sections: if len(section.strip()) < 50: continue score = 0 section_lower = section.lower() for term in key_terms_lower: score += section_lower.count(term) if re.search(r'\b(20)\d{2}\b', section): score += 1 if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section): score += 1 scored_sections.append((score, section)) scored_sections.sort(key=lambda x: x[0], reverse=True) top_sections = [section for score, section in scored_sections[:3] if score > 0] return '\n\n'.join(top_sections) if top_sections else '\n\n'.join(sections[:2]) def run(self, question: str) -> str: try: entities = self._extract_entities(question) if not entities: cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() result = self.base_tool.run(cleaned_question) return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found." search_results = self._search_multiple_terms(entities) if not search_results: return "No relevant Wikipedia articles found." formatted_results = [] for entity, content in search_results.items(): relevant_content = self._extract_relevant_sections(content, question) if relevant_content: formatted_results.append(f"=== {entity} ===\n{relevant_content}") return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results) if formatted_results else "No relevant info found." except Exception as e: return f"Wikipedia tool error: {str(e)}" # ----------- Enhanced File Processing Tools ----------- # FIX: Restructured signature to accept individual positional variables rather than an 'inputs' dict. # This prevents signature mismatch failures when evaluated by agent.py pipelines. def excel_to_markdown(excel_path: str, sheet_name: Optional[str] = None) -> str: """Reads an Excel or CSV file and creates a high-context markdown view without raw line data cuts.""" try: file_path = Path(excel_path).expanduser().resolve() if not file_path.is_file(): return f"Error: Excel file not found at {file_path}" # Resolve index location safely sheet = int(sheet_name) if sheet_name and str(sheet_name).isdigit() else sheet_name or 0 # Read file dynamically matching underlying format extensions if file_path.suffix.lower() == '.csv': df = pd.read_csv(file_path) else: df = pd.read_excel(file_path, sheet_name=sheet) metadata = f"EXCEL FILE ANALYSIS:\nFile: {file_path.name}\nDimensions: {len(df)} rows × {len(df.columns)} columns\n" metadata += f"Columns: {', '.join(df.columns.astype(str).tolist())}\n" numeric_cols = df.select_dtypes(include=['number']).columns if len(numeric_cols) > 0: for col in numeric_cols[:3]: metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}\n" # FIX: Ensure datasets matching standard sizing criteria are fully returned to protect math pipelines. if len(df) <= 300: metadata += "\nCOMPLETE DATASET TABLE:\n" data_render = df.to_markdown(index=False) if hasattr(df, "to_markdown") else tabulate(df, headers="keys", tablefmt="github", showindex=False) return metadata + data_render else: metadata += f"\nLARGE DATASET (First 50 structural items processed):\n" trimmed_df = df.head(50) data_render = trimmed_df.to_markdown(index=False) if hasattr(trimmed_df, "to_markdown") else tabulate(trimmed_df, headers="keys", tablefmt="github", showindex=False) return metadata + data_render + f"\n\n... Truncation info: Sheet contains {len(df) - 50} additional rows below this view." except Exception as e: return f"Error reading Excel file: {str(e)}" def image_file_info(image_path: str, question: str) -> str: try: from google import genai from google.genai.types import Part client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) with open(image_path, "rb") as f: img_bytes = f.read() response = client.models.generate_content( model="gemini-2.5-flash", contents=[ question, Part.from_bytes(data=img_bytes, mime_type="image/jpeg") ], ) return response.text except Exception as e: return f"Error during image analysis: {e}" def audio_file_info(audio_path: str) -> str: """Uses globally warmed Whisper model instance directly to skip disk read steps.""" if not WHISPER_MODEL: return "Audio analysis engine failure: Whisper is uninstantiated." try: result = WHISPER_MODEL.transcribe(str(audio_path), fp16=False) return result.get('text', 'Audio tracking complete. No language metadata detected.') except Exception as e: return f"Error transcribing audio: {str(e)}" # FIX: Synced parameter variable naming cleanly to track exact agent.py imports def code_file_read(file_path: str) -> str: try: resolved_path = Path(file_path) with open(resolved_path, "r", encoding="utf-8") as f: content = f.read() info = f"CODE FILE ANALYSIS:\nFile: {resolved_path.name}\nLines: {len(content.splitlines())}\n" if resolved_path.suffix == '.py': func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE)) class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE)) info += f"Functions: {func_count}, Classes: {class_count}\n" return f"{info}\nCODE CONTENT:\n{content}" except Exception as e: return f"Error reading code file: {e}" # FIX: Renamed parameter name to url_or_text to natively process naked URLs extracted by agents def extract_youtube_info(url_or_text: str) -> str: """Extracts YouTube URL from question, downloads audio, and forwards to transcription engine.""" pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))" match = re.search(pattern, url_or_text) youtube_url = match.group(1) if match else url_or_text match_id = re.search(r"(?:v=|\/)([a-zA-Z0-9_-]{11})", youtube_url) video_id = match_id.group(1) if match_id else "downloaded_video" file_output = f"{video_id}.mp4" ydl_opts = { 'format': 'best[ext=mp4]/best', 'outtmpl': file_output, 'quiet': True, 'no_warnings': True, } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) if os.path.exists(file_output): transcription = audio_file_info(file_output) # Cleanup media tracking trace manually to prevent space fill limits try: os.remove(file_output) except: pass return transcription return "Error: Download completed but file path could not be resolved." except Exception as e: return f"Error processing YouTube video: {e}"