|
|
from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
|
|
|
from langchain.utilities import WikipediaAPIWrapper
|
|
|
from PIL import Image
|
|
|
import re
|
|
|
import time
|
|
|
import json
|
|
|
import pandas as pd
|
|
|
from pathlib import Path
|
|
|
from typing import List, Dict, Optional, Union
|
|
|
from tabulate import tabulate
|
|
|
import whisper
|
|
|
|
|
|
import numpy as np
|
|
|
import os
|
|
|
from langchain_groq import ChatGroq
|
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
|
import re
|
|
|
|
|
|
from langchain_openai import ChatOpenAI
|
|
|
llm=ChatOpenAI(model='gpt-4o', temperature=0)
|
|
|
|
|
|
|
|
|
class EnhancedSearchTool:
|
|
|
"""Enhanced web search with intelligent query processing and result filtering"""
|
|
|
|
|
|
def __init__(self, max_results: int = 10):
|
|
|
self.base_tool = DuckDuckGoSearchResults(num_results=max_results)
|
|
|
self.max_results = max_results
|
|
|
|
|
|
def _extract_key_terms(self, question: str) -> List[str]:
|
|
|
"""Extract key search terms from the question using LLM"""
|
|
|
try:
|
|
|
extract_prompt = f"""
|
|
|
Extract the most important search terms from this question for web search:
|
|
|
Question: {question}
|
|
|
|
|
|
Return ONLY a comma-separated list of key terms, no explanations.
|
|
|
Focus on: proper nouns, specific concepts, technical terms, dates, numbers.
|
|
|
Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'.
|
|
|
|
|
|
Example: "What is the population of Tokyo in 2023?" -> "Tokyo population 2023"
|
|
|
"""
|
|
|
|
|
|
response = llm.invoke(extract_prompt).content.strip()
|
|
|
return [term.strip() for term in response.split(',')]
|
|
|
except Exception:
|
|
|
|
|
|
return self._simple_keyword_extraction(question)
|
|
|
|
|
|
def _simple_keyword_extraction(self, question: str) -> List[str]:
|
|
|
"""Fallback keyword extraction using regex"""
|
|
|
|
|
|
stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'}
|
|
|
words = re.findall(r'\b[A-Za-z]+\b', question.lower())
|
|
|
return [word for word in words if word not in stop_words and len(word) > 2]
|
|
|
|
|
|
def _generate_search_queries(self, question: str) -> List[str]:
|
|
|
"""Generate multiple search queries for comprehensive results"""
|
|
|
key_terms = self._extract_key_terms(question)
|
|
|
|
|
|
queries = []
|
|
|
|
|
|
|
|
|
cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
|
|
|
queries.append(cleaned_question)
|
|
|
|
|
|
|
|
|
if key_terms:
|
|
|
queries.append(' '.join(key_terms[:5]))
|
|
|
|
|
|
|
|
|
if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']):
|
|
|
queries.append(f"{' '.join(key_terms[:3])} 2024 2025")
|
|
|
|
|
|
if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']):
|
|
|
queries.append(f"{' '.join(key_terms[:3])} statistics data")
|
|
|
|
|
|
if any(word in question.lower() for word in ['definition', 'what is', 'meaning']):
|
|
|
queries.append(f"{' '.join(key_terms[:2])} definition meaning")
|
|
|
|
|
|
return list(dict.fromkeys(queries))
|
|
|
|
|
|
def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]:
|
|
|
"""Filter and rank search results based on relevance"""
|
|
|
if not results:
|
|
|
return results
|
|
|
|
|
|
key_terms = self._extract_key_terms(question)
|
|
|
key_terms_lower = [term.lower() for term in key_terms]
|
|
|
|
|
|
scored_results = []
|
|
|
for result in results:
|
|
|
score = 0
|
|
|
text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower()
|
|
|
|
|
|
|
|
|
for term in key_terms_lower:
|
|
|
if term in text_content:
|
|
|
score += text_content.count(term)
|
|
|
|
|
|
|
|
|
if any(year in text_content for year in ['2024', '2025', '2023']):
|
|
|
score += 2
|
|
|
|
|
|
|
|
|
if len(result.get('snippet', '')) < 50:
|
|
|
score -= 1
|
|
|
|
|
|
scored_results.append((score, result))
|
|
|
|
|
|
|
|
|
scored_results.sort(key=lambda x: x[0], reverse=True)
|
|
|
return [result for score, result in scored_results[:self.max_results]]
|
|
|
|
|
|
def run(self, question: str) -> str:
|
|
|
"""Enhanced search execution with multiple queries and result filtering"""
|
|
|
try:
|
|
|
search_queries = self._generate_search_queries(question)
|
|
|
all_results = []
|
|
|
|
|
|
for query in search_queries[:3]:
|
|
|
try:
|
|
|
results = self.base_tool.run(query)
|
|
|
if isinstance(results, str):
|
|
|
|
|
|
try:
|
|
|
results = json.loads(results) if results.startswith('[') else [{'snippet': results, 'title': 'Search Result'}]
|
|
|
except:
|
|
|
results = [{'snippet': results, 'title': 'Search Result'}]
|
|
|
|
|
|
if isinstance(results, list):
|
|
|
all_results.extend(results)
|
|
|
|
|
|
time.sleep(0.5)
|
|
|
except Exception as e:
|
|
|
print(f"Search query failed: {query} - {e}")
|
|
|
continue
|
|
|
|
|
|
if not all_results:
|
|
|
return "No search results found."
|
|
|
|
|
|
|
|
|
filtered_results = self._filter_and_rank_results(all_results, question)
|
|
|
|
|
|
|
|
|
formatted_results = []
|
|
|
for i, result in enumerate(filtered_results[:5], 1):
|
|
|
title = result.get('title', 'No title')
|
|
|
snippet = result.get('snippet', 'No description')
|
|
|
link = result.get('link', '')
|
|
|
|
|
|
formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n")
|
|
|
|
|
|
return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results)
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Enhanced search error: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
class EnhancedWikipediaTool:
|
|
|
"""Enhanced Wikipedia search with intelligent query processing and content extraction"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.base_wrapper = WikipediaAPIWrapper(
|
|
|
top_k_results=3,
|
|
|
doc_content_chars_max=3000,
|
|
|
load_all_available_meta=True
|
|
|
)
|
|
|
self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper)
|
|
|
|
|
|
def _extract_entities(self, question: str) -> List[str]:
|
|
|
"""Extract named entities for Wikipedia search"""
|
|
|
try:
|
|
|
entity_prompt = f"""
|
|
|
Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search:
|
|
|
Question: {question}
|
|
|
|
|
|
Return ONLY a comma-separated list of the most important entities.
|
|
|
Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts.
|
|
|
|
|
|
Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity, relativity"
|
|
|
"""
|
|
|
response = llm.invoke(entity_prompt).content.strip()
|
|
|
print(f'inside extract_entities:{response}')
|
|
|
entities = [entity.strip() for entity in response.split(',')]
|
|
|
return [e for e in entities if len(e) > 2]
|
|
|
except Exception:
|
|
|
|
|
|
return self._extract_capitalized_terms(question)
|
|
|
|
|
|
def _extract_capitalized_terms(self, question: str) -> List[str]:
|
|
|
"""Fallback: extract capitalized terms as potential entities"""
|
|
|
|
|
|
capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question)
|
|
|
|
|
|
quoted_terms = re.findall(r'"([^"]+)"', question)
|
|
|
quoted_terms.extend(re.findall(r"'([^']+)'", question))
|
|
|
|
|
|
return capitalized_words + quoted_terms
|
|
|
|
|
|
def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]:
|
|
|
"""Search Wikipedia for multiple entities and return best results"""
|
|
|
results = {}
|
|
|
|
|
|
for entity in entities[:3]:
|
|
|
try:
|
|
|
result = self.base_tool.run(entity)
|
|
|
print(f'Inside _search_multiple_terms: {result}')
|
|
|
if result and "Page:" in result and len(result) > 100:
|
|
|
results[entity] = result
|
|
|
time.sleep(0.5)
|
|
|
except Exception as e:
|
|
|
print(f"Wikipedia search failed for '{entity}': {e}")
|
|
|
continue
|
|
|
|
|
|
return results
|
|
|
|
|
|
def _extract_relevant_sections(self, content: str, question: str) -> str:
|
|
|
"""Extract the most relevant sections from Wikipedia content"""
|
|
|
if not content or len(content) < 200:
|
|
|
return content
|
|
|
|
|
|
|
|
|
sections = re.split(r'\n\s*\n', content)
|
|
|
print(f'Inside _extract relevant sections:{sections}')
|
|
|
|
|
|
|
|
|
key_terms = self._extract_entities(question)
|
|
|
key_terms_lower = [term.lower() for term in key_terms]
|
|
|
|
|
|
scored_sections = []
|
|
|
for section in sections:
|
|
|
if len(section.strip()) < 500:
|
|
|
continue
|
|
|
|
|
|
score = 0
|
|
|
section_lower = section.lower()
|
|
|
|
|
|
|
|
|
for term in key_terms_lower:
|
|
|
score += section_lower.count(term)
|
|
|
|
|
|
|
|
|
if re.search(r'\b(19|20)\d{2}\b', section):
|
|
|
score += 1
|
|
|
if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section):
|
|
|
score += 1
|
|
|
|
|
|
scored_sections.append((score, section))
|
|
|
|
|
|
|
|
|
scored_sections.sort(key=lambda x: x[0], reverse=True)
|
|
|
top_sections = [section for score, section in scored_sections[:7] if score > 0]
|
|
|
print(f'Inside extract relevant sections, top sections:{top_sections}')
|
|
|
|
|
|
if not top_sections:
|
|
|
|
|
|
top_sections = sections[:2]
|
|
|
|
|
|
return '\n\n'.join(top_sections)
|
|
|
|
|
|
def run(self, question: str) -> str:
|
|
|
"""Enhanced Wikipedia search with entity extraction and content filtering"""
|
|
|
try:
|
|
|
entities = self._extract_entities(question)
|
|
|
|
|
|
if not entities:
|
|
|
|
|
|
cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
|
|
|
try:
|
|
|
result = self.base_tool.run(cleaned_question)
|
|
|
print(f'******************Inside run*************:{result} ')
|
|
|
return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found."
|
|
|
except Exception as e:
|
|
|
return f"Wikipedia search error: {str(e)}"
|
|
|
|
|
|
|
|
|
search_results = self._search_multiple_terms(entities)
|
|
|
|
|
|
if not search_results:
|
|
|
return "No relevant Wikipedia articles found."
|
|
|
|
|
|
|
|
|
formatted_results = []
|
|
|
for entity, content in search_results.items():
|
|
|
relevant_content = self._extract_relevant_sections(content, question)
|
|
|
if relevant_content:
|
|
|
formatted_results.append(f"=== {entity} ===\n{relevant_content}")
|
|
|
|
|
|
if not formatted_results:
|
|
|
return "No relevant information found in Wikipedia articles."
|
|
|
|
|
|
return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results)
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Enhanced Wikipedia error: {str(e)}"
|
|
|
|
|
|
|
|
|
def excel_to_markdown(excel_path: str, sheet_name: Optional[str] = None) -> str:
|
|
|
"""Enhanced Excel tool with better error handling and data analysis"""
|
|
|
try:
|
|
|
file_path = Path(excel_path).expanduser().resolve()
|
|
|
if not file_path.is_file():
|
|
|
return f"Error: Excel file not found at {file_path}"
|
|
|
|
|
|
sheet: Union[str, int] = (
|
|
|
int(sheet_name) if sheet_name and sheet_name.isdigit() else sheet_name or 0
|
|
|
)
|
|
|
df = pd.read_excel(file_path, sheet_name=sheet)
|
|
|
df = df.iloc[:, :-1]
|
|
|
|
|
|
|
|
|
metadata = f"EXCEL FILE ANALYSIS:\n"
|
|
|
metadata += f"File: {file_path.name}\n"
|
|
|
metadata += f"Dimensions: {len(df)} rows × {len(df.columns)} columns\n"
|
|
|
metadata += f"Columns: {', '.join(df.columns.tolist())}\n"
|
|
|
metadata += f"Data types: {dict(df.dtypes)}\n"
|
|
|
|
|
|
|
|
|
numeric_cols = df.select_dtypes(include=['number']).columns
|
|
|
if len(numeric_cols) > 0:
|
|
|
metadata += f"Numeric columns: {list(numeric_cols)}\n"
|
|
|
for col in numeric_cols:
|
|
|
metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}, sum={df[col].sum()}\n"
|
|
|
|
|
|
metadata += "\nSAMPLE DATA (first 10 rows):\n"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return metadata
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error reading Excel file: {str(e)}"
|
|
|
|
|
|
|
|
|
import os
|
|
|
import mimetypes
|
|
|
from pathlib import Path
|
|
|
|
|
|
def image_file_info(image_path: str, question: str) -> str:
|
|
|
"""Enhanced image file analysis using Gemini API"""
|
|
|
try:
|
|
|
|
|
|
if not os.path.exists(image_path):
|
|
|
return f"Error: Image file not found at {image_path}"
|
|
|
|
|
|
|
|
|
try:
|
|
|
import google.generativeai as genai
|
|
|
from PIL import Image
|
|
|
|
|
|
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
|
|
|
|
|
|
|
|
|
model = genai.GenerativeModel('gemini-1.5-flash')
|
|
|
|
|
|
|
|
|
try:
|
|
|
image = Image.open(image_path)
|
|
|
|
|
|
if image.mode in ('RGBA', 'LA'):
|
|
|
background = Image.new('RGB', image.size, (255, 255, 255))
|
|
|
if image.mode == 'RGBA':
|
|
|
background.paste(image, mask=image.split()[-1])
|
|
|
else:
|
|
|
background.paste(image, mask=image.split()[-1])
|
|
|
image = background
|
|
|
elif image.mode != 'RGB':
|
|
|
image = image.convert('RGB')
|
|
|
|
|
|
except Exception as img_error:
|
|
|
return f"Error opening image: {img_error}"
|
|
|
|
|
|
|
|
|
response = model.generate_content([question, image])
|
|
|
|
|
|
return response.text
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
try:
|
|
|
from google import genai
|
|
|
from google.genai import types
|
|
|
|
|
|
|
|
|
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
|
|
|
|
|
|
|
|
|
with open(image_path, "rb") as f:
|
|
|
img_bytes = f.read()
|
|
|
|
|
|
|
|
|
mime_type, _ = mimetypes.guess_type(image_path)
|
|
|
if mime_type is None or not mime_type.startswith('image/'):
|
|
|
|
|
|
if image_path.lower().endswith('.png'):
|
|
|
mime_type = "image/png"
|
|
|
else:
|
|
|
mime_type = "image/jpeg"
|
|
|
|
|
|
|
|
|
response = client.models.generate_content(
|
|
|
model="gemini-1.5-flash",
|
|
|
contents=[
|
|
|
question,
|
|
|
types.Part.from_bytes(data=img_bytes, mime_type=mime_type)
|
|
|
],
|
|
|
)
|
|
|
|
|
|
return response.text
|
|
|
|
|
|
except Exception as new_sdk_error:
|
|
|
return f"Error with both SDKs. New SDK error: {new_sdk_error}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error during image analysis: {e}"
|
|
|
|
|
|
def audio_file_info(audio_path: str) -> str:
|
|
|
"""Returns only the transcription of an audio file."""
|
|
|
try:
|
|
|
model = whisper.load_model("tiny")
|
|
|
result = model.transcribe(audio_path, fp16=False)
|
|
|
return result['text']
|
|
|
except Exception as e:
|
|
|
return f"Error transcribing audio: {str(e)}"
|
|
|
|
|
|
def code_file_read(code_path: str) -> str:
|
|
|
"""Enhanced code file analysis"""
|
|
|
try:
|
|
|
with open(code_path, "r", encoding="utf-8") as f:
|
|
|
content = f.read()
|
|
|
|
|
|
file_path = Path(code_path)
|
|
|
|
|
|
info = f"CODE FILE ANALYSIS:\n"
|
|
|
info += f"File: {file_path.name}\n"
|
|
|
info += f"Extension: {file_path.suffix}\n"
|
|
|
info += f"Size: {len(content)} characters, {len(content.splitlines())} lines\n"
|
|
|
|
|
|
|
|
|
if file_path.suffix == '.py':
|
|
|
|
|
|
import_lines = [line for line in content.splitlines() if line.strip().startswith(('import ', 'from '))]
|
|
|
if import_lines:
|
|
|
info += f"Imports ({len(import_lines)}): {', '.join(import_lines[:5])}\n"
|
|
|
|
|
|
|
|
|
func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE))
|
|
|
class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE))
|
|
|
info += f"Functions: {func_count}, Classes: {class_count}\n"
|
|
|
|
|
|
info += f"\nCODE CONTENT:\n{content}"
|
|
|
return info
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error reading code file: {e}"
|
|
|
|
|
|
|
|
|
import yt_dlp
|
|
|
from pathlib import Path
|
|
|
|
|
|
def extract_youtube_info(question: str) -> str:
|
|
|
"""
|
|
|
Download a YouTube video or audio using yt-dlp without merging.
|
|
|
|
|
|
Parameters:
|
|
|
- url: str — YouTube URL
|
|
|
- audio_only: bool — if True, downloads audio only; else best single video+audio stream
|
|
|
|
|
|
Returns:
|
|
|
- str: path to downloaded file or error message
|
|
|
"""
|
|
|
pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))"
|
|
|
match = re.search(pattern, question)
|
|
|
youtube_url = match.group(1) if match else None
|
|
|
|
|
|
print(f"Extracting YouTube URL: {youtube_url}")
|
|
|
try:
|
|
|
|
|
|
video_id = re.search(r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', youtube_url).group(1)
|
|
|
|
|
|
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
|
|
|
|
|
|
|
|
|
full_transcript = ' '.join([entry['text'] for entry in transcript_list])
|
|
|
|
|
|
|
|
|
full_transcript = re.sub(r'\s+', ' ', full_transcript).strip()
|
|
|
|
|
|
return full_transcript
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error getting transcript: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|