|
|
from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun |
|
|
from langchain.utilities import WikipediaAPIWrapper |
|
|
from PIL import Image |
|
|
import re |
|
|
import time |
|
|
import json |
|
|
import pandas as pd |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional, Union |
|
|
from tabulate import tabulate |
|
|
import whisper |
|
|
|
|
|
import numpy as np |
|
|
import os |
|
|
|
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
import re |
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
llm=ChatOpenAI(model='gpt-4o', temperature=0) |
|
|
|
|
|
|
|
|
class EnhancedSearchTool: |
|
|
"""Enhanced web search with intelligent query processing and result filtering""" |
|
|
|
|
|
def __init__(self, max_results: int = 10): |
|
|
self.base_tool = DuckDuckGoSearchResults(num_results=max_results) |
|
|
self.max_results = max_results |
|
|
|
|
|
def _extract_key_terms(self, question: str) -> List[str]: |
|
|
"""Extract key search terms from the question using LLM""" |
|
|
try: |
|
|
extract_prompt = f""" |
|
|
Extract the most important search terms from this question for web search: |
|
|
Question: {question} |
|
|
|
|
|
Return ONLY a comma-separated list of key terms, no explanations. |
|
|
Focus on: proper nouns, specific concepts, technical terms, dates, numbers. |
|
|
Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'. |
|
|
|
|
|
Example: "What is the population of Tokyo in 2023?" -> "Tokyo population 2023" |
|
|
""" |
|
|
|
|
|
response = llm.invoke(extract_prompt).content.strip() |
|
|
return [term.strip() for term in response.split(',')] |
|
|
except Exception: |
|
|
|
|
|
return self._simple_keyword_extraction(question) |
|
|
|
|
|
def _simple_keyword_extraction(self, question: str) -> List[str]: |
|
|
"""Fallback keyword extraction using regex""" |
|
|
|
|
|
stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'} |
|
|
words = re.findall(r'\b[A-Za-z]+\b', question.lower()) |
|
|
return [word for word in words if word not in stop_words and len(word) > 2] |
|
|
|
|
|
def _generate_search_queries(self, question: str) -> List[str]: |
|
|
"""Generate multiple search queries for comprehensive results""" |
|
|
key_terms = self._extract_key_terms(question) |
|
|
|
|
|
queries = [] |
|
|
|
|
|
|
|
|
cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() |
|
|
queries.append(cleaned_question) |
|
|
|
|
|
|
|
|
if key_terms: |
|
|
queries.append(' '.join(key_terms[:5])) |
|
|
|
|
|
|
|
|
if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']): |
|
|
queries.append(f"{' '.join(key_terms[:3])} 2024 2025") |
|
|
|
|
|
if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']): |
|
|
queries.append(f"{' '.join(key_terms[:3])} statistics data") |
|
|
|
|
|
if any(word in question.lower() for word in ['definition', 'what is', 'meaning']): |
|
|
queries.append(f"{' '.join(key_terms[:2])} definition meaning") |
|
|
|
|
|
return list(dict.fromkeys(queries)) |
|
|
|
|
|
def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]: |
|
|
"""Filter and rank search results based on relevance""" |
|
|
if not results: |
|
|
return results |
|
|
|
|
|
key_terms = self._extract_key_terms(question) |
|
|
key_terms_lower = [term.lower() for term in key_terms] |
|
|
|
|
|
scored_results = [] |
|
|
for result in results: |
|
|
score = 0 |
|
|
text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower() |
|
|
|
|
|
|
|
|
for term in key_terms_lower: |
|
|
if term in text_content: |
|
|
score += text_content.count(term) |
|
|
|
|
|
|
|
|
if any(year in text_content for year in ['2024', '2025', '2023']): |
|
|
score += 2 |
|
|
|
|
|
|
|
|
if len(result.get('snippet', '')) < 50: |
|
|
score -= 1 |
|
|
|
|
|
scored_results.append((score, result)) |
|
|
|
|
|
|
|
|
scored_results.sort(key=lambda x: x[0], reverse=True) |
|
|
return [result for score, result in scored_results[:self.max_results]] |
|
|
|
|
|
def run(self, question: str) -> str: |
|
|
"""Enhanced search execution with multiple queries and result filtering""" |
|
|
try: |
|
|
search_queries = self._generate_search_queries(question) |
|
|
all_results = [] |
|
|
|
|
|
for query in search_queries[:3]: |
|
|
try: |
|
|
results = self.base_tool.run(query) |
|
|
if isinstance(results, str): |
|
|
|
|
|
try: |
|
|
results = json.loads(results) if results.startswith('[') else [{'snippet': results, 'title': 'Search Result'}] |
|
|
except: |
|
|
results = [{'snippet': results, 'title': 'Search Result'}] |
|
|
|
|
|
if isinstance(results, list): |
|
|
all_results.extend(results) |
|
|
|
|
|
time.sleep(0.5) |
|
|
except Exception as e: |
|
|
print(f"Search query failed: {query} - {e}") |
|
|
continue |
|
|
|
|
|
if not all_results: |
|
|
return "No search results found." |
|
|
|
|
|
|
|
|
filtered_results = self._filter_and_rank_results(all_results, question) |
|
|
|
|
|
|
|
|
formatted_results = [] |
|
|
for i, result in enumerate(filtered_results[:5], 1): |
|
|
title = result.get('title', 'No title') |
|
|
snippet = result.get('snippet', 'No description') |
|
|
link = result.get('link', '') |
|
|
|
|
|
formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n") |
|
|
|
|
|
return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Enhanced search error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
class EnhancedWikipediaTool: |
|
|
"""Enhanced Wikipedia search with intelligent query processing and content extraction""" |
|
|
|
|
|
def __init__(self): |
|
|
self.base_wrapper = WikipediaAPIWrapper( |
|
|
top_k_results=3, |
|
|
doc_content_chars_max=3000, |
|
|
load_all_available_meta=True |
|
|
) |
|
|
self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper) |
|
|
|
|
|
def _extract_entities(self, question: str) -> List[str]: |
|
|
"""Extract named entities for Wikipedia search""" |
|
|
try: |
|
|
entity_prompt = f""" |
|
|
Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search: |
|
|
Question: {question} |
|
|
|
|
|
Return ONLY a comma-separated list of the most important entities. |
|
|
Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts. |
|
|
|
|
|
Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity, relativity" |
|
|
""" |
|
|
response = llm.invoke(entity_prompt).content.strip() |
|
|
print(f'inside extract_entities:{response}') |
|
|
entities = [entity.strip() for entity in response.split(',')] |
|
|
return [e for e in entities if len(e) > 2] |
|
|
except Exception: |
|
|
|
|
|
return self._extract_capitalized_terms(question) |
|
|
|
|
|
def _extract_capitalized_terms(self, question: str) -> List[str]: |
|
|
"""Fallback: extract capitalized terms as potential entities""" |
|
|
|
|
|
capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question) |
|
|
|
|
|
quoted_terms = re.findall(r'"([^"]+)"', question) |
|
|
quoted_terms.extend(re.findall(r"'([^']+)'", question)) |
|
|
|
|
|
return capitalized_words + quoted_terms |
|
|
|
|
|
def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]: |
|
|
"""Search Wikipedia for multiple entities and return best results""" |
|
|
results = {} |
|
|
|
|
|
for entity in entities[:3]: |
|
|
try: |
|
|
result = self.base_tool.run(entity) |
|
|
print(f'Inside _search_multiple_terms: {result}') |
|
|
if result and "Page:" in result and len(result) > 100: |
|
|
results[entity] = result |
|
|
time.sleep(0.5) |
|
|
except Exception as e: |
|
|
print(f"Wikipedia search failed for '{entity}': {e}") |
|
|
continue |
|
|
|
|
|
return results |
|
|
|
|
|
def _extract_relevant_sections(self, content: str, question: str) -> str: |
|
|
"""Extract the most relevant sections from Wikipedia content""" |
|
|
if not content or len(content) < 200: |
|
|
return content |
|
|
|
|
|
|
|
|
sections = re.split(r'\n\s*\n', content) |
|
|
print(f'Inside _extract relevant sections:{sections}') |
|
|
|
|
|
|
|
|
key_terms = self._extract_entities(question) |
|
|
key_terms_lower = [term.lower() for term in key_terms] |
|
|
|
|
|
scored_sections = [] |
|
|
for section in sections: |
|
|
if len(section.strip()) < 500: |
|
|
continue |
|
|
|
|
|
score = 0 |
|
|
section_lower = section.lower() |
|
|
|
|
|
|
|
|
for term in key_terms_lower: |
|
|
score += section_lower.count(term) |
|
|
|
|
|
|
|
|
if re.search(r'\b(19|20)\d{2}\b', section): |
|
|
score += 1 |
|
|
if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section): |
|
|
score += 1 |
|
|
|
|
|
scored_sections.append((score, section)) |
|
|
|
|
|
|
|
|
scored_sections.sort(key=lambda x: x[0], reverse=True) |
|
|
top_sections = [section for score, section in scored_sections[:7] if score > 0] |
|
|
print(f'Inside extract relevant sections, top sections:{top_sections}') |
|
|
|
|
|
if not top_sections: |
|
|
|
|
|
top_sections = sections[:2] |
|
|
|
|
|
return '\n\n'.join(top_sections) |
|
|
|
|
|
def run(self, question: str) -> str: |
|
|
"""Enhanced Wikipedia search with entity extraction and content filtering""" |
|
|
try: |
|
|
entities = self._extract_entities(question) |
|
|
|
|
|
if not entities: |
|
|
|
|
|
cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip() |
|
|
try: |
|
|
result = self.base_tool.run(cleaned_question) |
|
|
print(f'******************Inside run*************:{result} ') |
|
|
return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found." |
|
|
except Exception as e: |
|
|
return f"Wikipedia search error: {str(e)}" |
|
|
|
|
|
|
|
|
search_results = self._search_multiple_terms(entities) |
|
|
|
|
|
if not search_results: |
|
|
return "No relevant Wikipedia articles found." |
|
|
|
|
|
|
|
|
formatted_results = [] |
|
|
for entity, content in search_results.items(): |
|
|
relevant_content = self._extract_relevant_sections(content, question) |
|
|
if relevant_content: |
|
|
formatted_results.append(f"=== {entity} ===\n{relevant_content}") |
|
|
|
|
|
if not formatted_results: |
|
|
return "No relevant information found in Wikipedia articles." |
|
|
|
|
|
return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Enhanced Wikipedia error: {str(e)}" |
|
|
|
|
|
|
|
|
def excel_to_markdown(excel_path: str, sheet_name: Optional[str] = None) -> str: |
|
|
"""Enhanced Excel tool with better error handling and data analysis""" |
|
|
try: |
|
|
file_path = Path(excel_path).expanduser().resolve() |
|
|
if not file_path.is_file(): |
|
|
return f"Error: Excel file not found at {file_path}" |
|
|
|
|
|
sheet: Union[str, int] = ( |
|
|
int(sheet_name) if sheet_name and sheet_name.isdigit() else sheet_name or 0 |
|
|
) |
|
|
df = pd.read_excel(file_path, sheet_name=sheet) |
|
|
df = df.iloc[:, :-1] |
|
|
|
|
|
|
|
|
metadata = f"EXCEL FILE ANALYSIS:\n" |
|
|
metadata += f"File: {file_path.name}\n" |
|
|
metadata += f"Dimensions: {len(df)} rows × {len(df.columns)} columns\n" |
|
|
metadata += f"Columns: {', '.join(df.columns.tolist())}\n" |
|
|
metadata += f"Data types: {dict(df.dtypes)}\n" |
|
|
|
|
|
|
|
|
numeric_cols = df.select_dtypes(include=['number']).columns |
|
|
if len(numeric_cols) > 0: |
|
|
metadata += f"Numeric columns: {list(numeric_cols)}\n" |
|
|
for col in numeric_cols: |
|
|
metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}, sum={df[col].sum()}\n" |
|
|
|
|
|
metadata += "\nSAMPLE DATA (first 10 rows):\n" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return metadata |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error reading Excel file: {str(e)}" |
|
|
|
|
|
|
|
|
import os |
|
|
import mimetypes |
|
|
from pathlib import Path |
|
|
|
|
|
def image_file_info(image_path: str, question: str) -> str: |
|
|
"""Enhanced image file analysis using Gemini API""" |
|
|
try: |
|
|
|
|
|
if not os.path.exists(image_path): |
|
|
return f"Error: Image file not found at {image_path}" |
|
|
|
|
|
|
|
|
try: |
|
|
import google.generativeai as genai |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) |
|
|
|
|
|
|
|
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
|
|
|
|
|
|
try: |
|
|
image = Image.open(image_path) |
|
|
|
|
|
if image.mode in ('RGBA', 'LA'): |
|
|
background = Image.new('RGB', image.size, (255, 255, 255)) |
|
|
if image.mode == 'RGBA': |
|
|
background.paste(image, mask=image.split()[-1]) |
|
|
else: |
|
|
background.paste(image, mask=image.split()[-1]) |
|
|
image = background |
|
|
elif image.mode != 'RGB': |
|
|
image = image.convert('RGB') |
|
|
|
|
|
except Exception as img_error: |
|
|
return f"Error opening image: {img_error}" |
|
|
|
|
|
|
|
|
response = model.generate_content([question, image]) |
|
|
|
|
|
return response.text |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
try: |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
|
|
|
|
|
|
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) |
|
|
|
|
|
|
|
|
with open(image_path, "rb") as f: |
|
|
img_bytes = f.read() |
|
|
|
|
|
|
|
|
mime_type, _ = mimetypes.guess_type(image_path) |
|
|
if mime_type is None or not mime_type.startswith('image/'): |
|
|
|
|
|
if image_path.lower().endswith('.png'): |
|
|
mime_type = "image/png" |
|
|
else: |
|
|
mime_type = "image/jpeg" |
|
|
|
|
|
|
|
|
response = client.models.generate_content( |
|
|
model="gemini-1.5-flash", |
|
|
contents=[ |
|
|
question, |
|
|
types.Part.from_bytes(data=img_bytes, mime_type=mime_type) |
|
|
], |
|
|
) |
|
|
|
|
|
return response.text |
|
|
|
|
|
except Exception as new_sdk_error: |
|
|
return f"Error with both SDKs. New SDK error: {new_sdk_error}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error during image analysis: {e}" |
|
|
|
|
|
def audio_file_info(audio_path: str) -> str: |
|
|
"""Returns only the transcription of an audio file.""" |
|
|
try: |
|
|
model = whisper.load_model("tiny") |
|
|
result = model.transcribe(audio_path, fp16=False) |
|
|
return result['text'] |
|
|
except Exception as e: |
|
|
return f"Error transcribing audio: {str(e)}" |
|
|
|
|
|
def code_file_read(code_path: str) -> str: |
|
|
"""Enhanced code file analysis""" |
|
|
try: |
|
|
with open(code_path, "r", encoding="utf-8") as f: |
|
|
content = f.read() |
|
|
|
|
|
file_path = Path(code_path) |
|
|
|
|
|
info = f"CODE FILE ANALYSIS:\n" |
|
|
info += f"File: {file_path.name}\n" |
|
|
info += f"Extension: {file_path.suffix}\n" |
|
|
info += f"Size: {len(content)} characters, {len(content.splitlines())} lines\n" |
|
|
|
|
|
|
|
|
if file_path.suffix == '.py': |
|
|
|
|
|
import_lines = [line for line in content.splitlines() if line.strip().startswith(('import ', 'from '))] |
|
|
if import_lines: |
|
|
info += f"Imports ({len(import_lines)}): {', '.join(import_lines[:5])}\n" |
|
|
|
|
|
|
|
|
func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE)) |
|
|
class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE)) |
|
|
info += f"Functions: {func_count}, Classes: {class_count}\n" |
|
|
|
|
|
info += f"\nCODE CONTENT:\n{content}" |
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error reading code file: {e}" |
|
|
|
|
|
|
|
|
import yt_dlp |
|
|
from pathlib import Path |
|
|
|
|
|
def extract_youtube_info(question: str) -> str: |
|
|
""" |
|
|
Download a YouTube video or audio using yt-dlp without merging. |
|
|
|
|
|
Parameters: |
|
|
- url: str — YouTube URL |
|
|
- audio_only: bool — if True, downloads audio only; else best single video+audio stream |
|
|
|
|
|
Returns: |
|
|
- str: path to downloaded file or error message |
|
|
""" |
|
|
pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))" |
|
|
match = re.search(pattern, question) |
|
|
youtube_url = match.group(1) if match else None |
|
|
|
|
|
print(f"Extracting YouTube URL: {youtube_url}") |
|
|
try: |
|
|
|
|
|
video_id = re.search(r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', youtube_url).group(1) |
|
|
|
|
|
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) |
|
|
|
|
|
|
|
|
full_transcript = ' '.join([entry['text'] for entry in transcript_list]) |
|
|
|
|
|
|
|
|
full_transcript = re.sub(r'\s+', ' ', full_transcript).strip() |
|
|
|
|
|
return full_transcript |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error getting transcript: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|