|
|
""" |
|
|
Enhanced Web Search Tool for MCP Server. |
|
|
Takes user query, performs web search using multiple strategies, and returns results with sources. |
|
|
Optimized for reliability and real-time information retrieval. |
|
|
|
|
|
RECOMMENDED SERVER-SIDE APPROACH: |
|
|
Before calling this tool for financial queries, use an LLM to extract ticker symbols: |
|
|
|
|
|
Example LLM prompt: |
|
|
"Extract the stock ticker symbol from this query: 'What's NVIDIA's stock price?' |
|
|
If it's a financial query, return just the ticker (e.g., 'NVDA'). |
|
|
If not financial, return 'NOT_FINANCIAL'." |
|
|
|
|
|
Then call this tool with: "NVDA stock price" |
|
|
|
|
|
This approach is much more reliable than complex pattern matching. |
|
|
""" |
|
|
from smolagents import Tool |
|
|
from typing import Dict, Any, Optional, List |
|
|
import requests |
|
|
import re |
|
|
from datetime import datetime |
|
|
from urllib.parse import quote_plus, urlparse |
|
|
from bs4 import BeautifulSoup |
|
|
import json |
|
|
import time |
|
|
|
|
|
class WebSearchTool(Tool): |
|
|
"""Enhanced web search tool for real-time information.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.name = "web_search" |
|
|
self.description = "Search the web for real-time information using multiple search engines" |
|
|
self.input_type = "object" |
|
|
self.output_type = "object" |
|
|
self.inputs = { |
|
|
"query": { |
|
|
"type": "string", |
|
|
"description": "The search query" |
|
|
}, |
|
|
"max_results": { |
|
|
"type": "integer", |
|
|
"description": "Maximum number of results to return (default: 5)", |
|
|
"optional": True, |
|
|
"nullable": True |
|
|
} |
|
|
} |
|
|
self.outputs = { |
|
|
"results": { |
|
|
"type": "array", |
|
|
"description": "Search results with title, snippet, url, and source" |
|
|
}, |
|
|
"summary": { |
|
|
"type": "string", |
|
|
"description": "Formatted summary of the search results" |
|
|
}, |
|
|
"metadata": { |
|
|
"type": "object", |
|
|
"description": "Search metadata" |
|
|
} |
|
|
} |
|
|
self.required_inputs = ["query"] |
|
|
self.is_initialized = True |
|
|
|
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
'DNT': '1', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1' |
|
|
}) |
|
|
self.timeout = 15 |
|
|
|
|
|
def forward(self, query: str, max_results: Optional[int] = None) -> Dict[str, Any]: |
|
|
"""Perform web search and return results.""" |
|
|
max_results = max_results or 5 |
|
|
|
|
|
try: |
|
|
|
|
|
search_results = self._search_web_enhanced(query, max_results) |
|
|
|
|
|
|
|
|
summary = self._generate_summary(query, search_results) |
|
|
|
|
|
return { |
|
|
"results": search_results, |
|
|
"summary": summary, |
|
|
"metadata": { |
|
|
"query": query, |
|
|
"total_found": len(search_results), |
|
|
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
|
"search_engine": "multi-engine" |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"results": [], |
|
|
"summary": f"# Search Error\n\nUnable to fetch results for: *{query}*\n\nError: {str(e)}", |
|
|
"metadata": { |
|
|
"query": query, |
|
|
"error": str(e), |
|
|
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
} |
|
|
} |
|
|
|
|
|
def _search_web_enhanced(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Enhanced web search with multiple fallback strategies.""" |
|
|
|
|
|
|
|
|
if self._is_financial_query(query): |
|
|
print("π Detected financial query, trying specialized sources...") |
|
|
live_financial_data = self._get_live_financial_data(query) |
|
|
if live_financial_data: |
|
|
return live_financial_data |
|
|
|
|
|
|
|
|
search_strategies = [ |
|
|
("DuckDuckGo Instant Answer", self._search_duckduckgo_instant), |
|
|
("DuckDuckGo HTML", self._search_duckduckgo_html), |
|
|
("Bing", self._search_bing_enhanced), |
|
|
("Yahoo", self._search_yahoo_enhanced), |
|
|
("Alternative Search", self._search_alternative) |
|
|
] |
|
|
|
|
|
all_results = [] |
|
|
successful_strategies = 0 |
|
|
|
|
|
for strategy_name, strategy_func in search_strategies: |
|
|
try: |
|
|
print(f"π Trying {strategy_name}...") |
|
|
results = strategy_func(query, max_results) |
|
|
if results: |
|
|
print(f"β
{strategy_name} found {len(results)} results") |
|
|
all_results.extend(results) |
|
|
successful_strategies += 1 |
|
|
|
|
|
|
|
|
if len(all_results) >= max_results and successful_strategies >= 1: |
|
|
break |
|
|
else: |
|
|
print(f"β οΈ {strategy_name} returned no results") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β {strategy_name} failed: {str(e)}") |
|
|
continue |
|
|
|
|
|
|
|
|
seen_urls = set() |
|
|
unique_results = [] |
|
|
|
|
|
for result in all_results: |
|
|
url = result.get('url', '') |
|
|
if url and url not in seen_urls and len(unique_results) < max_results: |
|
|
seen_urls.add(url) |
|
|
unique_results.append(result) |
|
|
|
|
|
|
|
|
if unique_results: |
|
|
enhanced_results = self._enhance_results_with_content(unique_results, query) |
|
|
return enhanced_results |
|
|
|
|
|
return unique_results |
|
|
|
|
|
def _search_duckduckgo_instant(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Search using DuckDuckGo Instant Answer API.""" |
|
|
try: |
|
|
|
|
|
api_url = f"https://api.duckduckgo.com/" |
|
|
params = { |
|
|
'q': query, |
|
|
'format': 'json', |
|
|
'no_html': '1', |
|
|
'skip_disambig': '1' |
|
|
} |
|
|
|
|
|
response = self.session.get(api_url, params=params, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
results = [] |
|
|
|
|
|
|
|
|
if data.get('Answer'): |
|
|
results.append({ |
|
|
'title': f"Instant Answer: {query}", |
|
|
'snippet': data['Answer'], |
|
|
'url': data.get('AnswerURL', 'https://duckduckgo.com'), |
|
|
'source': 'DuckDuckGo Instant', |
|
|
'type': 'instant_answer' |
|
|
}) |
|
|
|
|
|
|
|
|
if data.get('Abstract'): |
|
|
results.append({ |
|
|
'title': data.get('Heading', query), |
|
|
'snippet': data['Abstract'], |
|
|
'url': data.get('AbstractURL', 'https://duckduckgo.com'), |
|
|
'source': data.get('AbstractSource', 'DuckDuckGo'), |
|
|
'type': 'abstract' |
|
|
}) |
|
|
|
|
|
|
|
|
if data.get('RelatedTopics'): |
|
|
for topic in data['RelatedTopics'][:2]: |
|
|
if isinstance(topic, dict) and topic.get('Text'): |
|
|
results.append({ |
|
|
'title': topic.get('FirstURL', '').split('/')[-1].replace('_', ' ').title(), |
|
|
'snippet': topic['Text'], |
|
|
'url': topic.get('FirstURL', ''), |
|
|
'source': 'DuckDuckGo Related', |
|
|
'type': 'related' |
|
|
}) |
|
|
|
|
|
return results[:max_results] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"DuckDuckGo Instant API error: {e}") |
|
|
return [] |
|
|
|
|
|
def _search_duckduckgo_html(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Search using DuckDuckGo HTML interface.""" |
|
|
try: |
|
|
search_url = f"https://html.duckduckgo.com/html/" |
|
|
params = {'q': query} |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml', |
|
|
'Accept-Language': 'en-US,en;q=0.9' |
|
|
} |
|
|
|
|
|
response = requests.get(search_url, params=params, headers=headers, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
results = [] |
|
|
|
|
|
|
|
|
for link in soup.find_all('a', class_='result__a'): |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
|
|
|
href = link.get('href') |
|
|
title = link.get_text(strip=True) |
|
|
|
|
|
if href and title and len(title) > 10: |
|
|
|
|
|
snippet = "" |
|
|
result_snippet = link.find_next('a', class_='result__snippet') |
|
|
if result_snippet: |
|
|
snippet = result_snippet.get_text(strip=True) |
|
|
|
|
|
results.append({ |
|
|
'title': title, |
|
|
'snippet': snippet, |
|
|
'url': href, |
|
|
'source': self._get_source_name(href), |
|
|
'type': 'search_result' |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"DuckDuckGo HTML search error: {e}") |
|
|
return [] |
|
|
|
|
|
def _search_bing_enhanced(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Enhanced Bing search with better parsing.""" |
|
|
try: |
|
|
search_url = f"https://www.bing.com/search" |
|
|
params = {'q': query, 'count': max_results} |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml' |
|
|
} |
|
|
|
|
|
response = requests.get(search_url, params=params, headers=headers, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
results = [] |
|
|
|
|
|
|
|
|
for result in soup.find_all('li', class_='b_algo'): |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
|
|
|
title_elem = result.find('h2') |
|
|
if not title_elem: |
|
|
continue |
|
|
|
|
|
link_elem = title_elem.find('a') |
|
|
if not link_elem: |
|
|
continue |
|
|
|
|
|
title = link_elem.get_text(strip=True) |
|
|
href = link_elem.get('href') |
|
|
|
|
|
|
|
|
snippet = "" |
|
|
snippet_elem = result.find('p', class_='b_para') or result.find('div', class_='b_caption') |
|
|
if snippet_elem: |
|
|
snippet = snippet_elem.get_text(strip=True) |
|
|
|
|
|
if href and title and len(title) > 5: |
|
|
results.append({ |
|
|
'title': title, |
|
|
'snippet': snippet, |
|
|
'url': href, |
|
|
'source': self._get_source_name(href), |
|
|
'type': 'search_result' |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Bing search error: {e}") |
|
|
return [] |
|
|
|
|
|
def _search_yahoo_enhanced(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Enhanced Yahoo search.""" |
|
|
try: |
|
|
search_url = f"https://search.yahoo.com/search" |
|
|
params = {'p': query, 'n': max_results} |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml' |
|
|
} |
|
|
|
|
|
response = requests.get(search_url, params=params, headers=headers, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
results = [] |
|
|
|
|
|
|
|
|
for result in soup.find_all('div', class_='dd algo'): |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
|
|
|
title_elem = result.find('h3') |
|
|
if not title_elem: |
|
|
continue |
|
|
|
|
|
link_elem = title_elem.find('a') |
|
|
if not link_elem: |
|
|
continue |
|
|
|
|
|
title = link_elem.get_text(strip=True) |
|
|
href = link_elem.get('href') |
|
|
|
|
|
|
|
|
snippet = "" |
|
|
snippet_elem = result.find('span', class_='s') or result.find('p') |
|
|
if snippet_elem: |
|
|
snippet = snippet_elem.get_text(strip=True) |
|
|
|
|
|
if href and title and len(title) > 5: |
|
|
results.append({ |
|
|
'title': title, |
|
|
'snippet': snippet, |
|
|
'url': href, |
|
|
'source': self._get_source_name(href), |
|
|
'type': 'search_result' |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Yahoo search error: {e}") |
|
|
return [] |
|
|
|
|
|
def _search_alternative(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
|
"""Alternative search method using Startpage.""" |
|
|
try: |
|
|
search_url = f"https://www.startpage.com/sp/search" |
|
|
params = {'query': query, 'num': max_results} |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml' |
|
|
} |
|
|
|
|
|
response = requests.get(search_url, params=params, headers=headers, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
link_pattern = r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>([^<]+)</a>' |
|
|
matches = re.findall(link_pattern, response.text, re.IGNORECASE) |
|
|
|
|
|
seen_urls = set() |
|
|
for url, title in matches: |
|
|
if len(results) >= max_results: |
|
|
break |
|
|
|
|
|
|
|
|
if (url.startswith('http') and |
|
|
url not in seen_urls and |
|
|
not any(skip in url.lower() for skip in ['startpage.com', 'google.com/search', 'privacy'])): |
|
|
|
|
|
clean_title = re.sub(r'<[^>]+>', '', title).strip() |
|
|
if len(clean_title) > 10: |
|
|
seen_urls.add(url) |
|
|
results.append({ |
|
|
'title': clean_title, |
|
|
'snippet': "", |
|
|
'url': url, |
|
|
'source': self._get_source_name(url), |
|
|
'type': 'search_result' |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Alternative search error: {e}") |
|
|
return [] |
|
|
|
|
|
def _enhance_results_with_content(self, results: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]: |
|
|
"""Enhance search results by scraping actual content.""" |
|
|
enhanced_results = [] |
|
|
|
|
|
for result in results: |
|
|
|
|
|
if result.get('snippet') and len(result['snippet']) > 50: |
|
|
enhanced_results.append(result) |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
scraped_content = self._scrape_url_content(result['url'], query) |
|
|
if scraped_content and not scraped_content.startswith("Unable to"): |
|
|
result['snippet'] = scraped_content[:500] + ("..." if len(scraped_content) > 500 else "") |
|
|
result['live_data'] = True |
|
|
enhanced_results.append(result) |
|
|
except Exception as e: |
|
|
print(f"Failed to enhance result from {result['url']}: {e}") |
|
|
enhanced_results.append(result) |
|
|
|
|
|
return enhanced_results |
|
|
|
|
|
def _is_financial_query(self, query: str) -> bool: |
|
|
"""Check if the query is asking for financial/stock information using generic patterns.""" |
|
|
financial_keywords = [ |
|
|
'stock', 'price', 'share', 'ticker', 'quote', 'market', 'trading', |
|
|
'nasdaq', 'nyse', 'equity', 'dividend', 'earnings', 'financial', |
|
|
'stock price', 'share price', 'market cap', 'market value', |
|
|
'investment', 'securities', 'publicly traded', 'listed company' |
|
|
] |
|
|
|
|
|
|
|
|
financial_patterns = [ |
|
|
r'\bstock\s+price\b', |
|
|
r'\bshare\s+price\b', |
|
|
r'\bmarket\s+value\b', |
|
|
r'\bmarket\s+cap\b', |
|
|
r'\bhow\s+much\s+is\s+\w+\s+worth\b', |
|
|
r'\bwhat\s+is\s+\w+\s+trading\s+at\b', |
|
|
r'\bcurrent\s+price\s+of\b', |
|
|
r'\bstock\s+quote\b', |
|
|
r'\bfinancial\s+data\b', |
|
|
r'\binvestment\s+information\b' |
|
|
] |
|
|
|
|
|
query_lower = query.lower() |
|
|
|
|
|
|
|
|
if any(keyword in query_lower for keyword in financial_keywords): |
|
|
return True |
|
|
|
|
|
|
|
|
for pattern in financial_patterns: |
|
|
if re.search(pattern, query_lower): |
|
|
return True |
|
|
|
|
|
|
|
|
ticker_patterns = [ |
|
|
r'\$[A-Z]{1,6}\b', |
|
|
r'\b[A-Z]{2,6}\s+(stock|price|quote|shares?)\b', |
|
|
r'\b(stock|price|quote|shares?)\s+[A-Z]{2,6}\b', |
|
|
r'\b[A-Z]{2,6}\.(NYSE|NASDAQ|NYSE)\b', |
|
|
] |
|
|
|
|
|
for pattern in ticker_patterns: |
|
|
if re.search(pattern, query.upper()): |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
company_financial_pattern = r'\b\w+\s+(stock|price|share|financial|trading|market|investment)\b' |
|
|
if re.search(company_financial_pattern, query_lower): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def _detect_ticker_symbol(self, query: str) -> str: |
|
|
"""Enhanced ticker detection using multiple patterns and strategies.""" |
|
|
query_upper = query.upper() |
|
|
|
|
|
|
|
|
excluded_words = { |
|
|
'WHAT', 'WHATS', 'WHERE', 'WHEN', 'WHO', 'HOW', 'WHY', 'WHICH', 'THE', 'AND', 'OR', |
|
|
'FOR', 'ARE', 'BUT', 'NOT', 'YOU', 'ALL', 'CAN', 'WAS', 'ONE', 'TWO', 'NEW', 'OLD', |
|
|
'STOCK', 'PRICE', 'CURRENT', 'SHARE', 'QUOTE', 'MARKET', 'TRADING', 'TODAY', 'NOW', |
|
|
'IS', 'OF', 'IN', 'ON', 'AT', 'BY', 'UP', 'TO', 'AS', 'AN', 'A', 'THIS', 'THAT', |
|
|
'WITH', 'FROM', 'ABOUT', 'INTO', 'THROUGH', 'DURING', 'BEFORE', 'AFTER', 'ABOVE', |
|
|
'BELOW', 'DOWN', 'OUT', 'OFF', 'OVER', 'UNDER', 'AGAIN', 'FURTHER', 'THEN', 'ONCE', |
|
|
'NYSE', 'NASDAQ', 'EXCHANGE', 'COMPANY', 'CORP', 'INC', 'LTD', 'LLC', 'CORPORATION', |
|
|
'FINANCIAL', 'DATA', 'INFO', 'INFORMATION', 'LATEST', 'RECENT', 'LIVE', 'REAL', 'TIME' |
|
|
} |
|
|
|
|
|
|
|
|
direct_ticker_patterns = [ |
|
|
r'\$([A-Z]{1,6})\b', |
|
|
r'\b([A-Z]{2,6})\.(NYSE|NASDAQ)\b', |
|
|
r'ticker[:\s]+([A-Z]{2,6})\b', |
|
|
r'symbol[:\s]+([A-Z]{2,6})\b', |
|
|
] |
|
|
|
|
|
for pattern in direct_ticker_patterns: |
|
|
matches = re.findall(pattern, query_upper) |
|
|
for match in matches: |
|
|
ticker = match[0] if isinstance(match, tuple) else match |
|
|
if ticker not in excluded_words and 1 <= len(ticker) <= 6: |
|
|
return ticker |
|
|
|
|
|
|
|
|
context_patterns = [ |
|
|
r'\b([A-Z]{2,6})\s+(stock|price|quote|shares?)\b', |
|
|
r'\b(stock|price|quote|shares?)\s+([A-Z]{2,6})\b', |
|
|
r'\bof\s+([A-Z]{2,6})\b', |
|
|
] |
|
|
|
|
|
for pattern in context_patterns: |
|
|
matches = re.findall(pattern, query_upper) |
|
|
for match in matches: |
|
|
ticker = match[1] if isinstance(match, tuple) and len(match) > 1 else match[0] if isinstance(match, tuple) else match |
|
|
if ticker not in excluded_words and 1 <= len(ticker) <= 6: |
|
|
return ticker |
|
|
|
|
|
|
|
|
|
|
|
company_patterns = [ |
|
|
r'\b(apple)\b.*(?:stock|price|quote)', |
|
|
r'\b(microsoft)\b.*(?:stock|price|quote)', |
|
|
r'\b(nvidia)\b.*(?:stock|price|quote)', |
|
|
r'\b(amazon)\b.*(?:stock|price|quote)', |
|
|
r'\b(google|alphabet)\b.*(?:stock|price|quote)', |
|
|
r'\b(tesla)\b.*(?:stock|price|quote)', |
|
|
r'\b(meta|facebook)\b.*(?:stock|price|quote)', |
|
|
r'\b(netflix)\b.*(?:stock|price|quote)', |
|
|
] |
|
|
|
|
|
company_to_ticker = { |
|
|
'apple': 'AAPL', |
|
|
'microsoft': 'MSFT', |
|
|
'nvidia': 'NVDA', |
|
|
'amazon': 'AMZN', |
|
|
'google': 'GOOGL', |
|
|
'alphabet': 'GOOGL', |
|
|
'tesla': 'TSLA', |
|
|
'meta': 'META', |
|
|
'facebook': 'META', |
|
|
'netflix': 'NFLX' |
|
|
} |
|
|
|
|
|
query_lower = query.lower() |
|
|
for pattern in company_patterns: |
|
|
matches = re.findall(pattern, query_lower) |
|
|
for match in matches: |
|
|
company = match.lower() |
|
|
if company in company_to_ticker: |
|
|
return company_to_ticker[company] |
|
|
|
|
|
|
|
|
words = query_upper.replace('?', '').replace('.', '').replace(',', '').split() |
|
|
|
|
|
for word in words: |
|
|
|
|
|
clean_word = ''.join(c for c in word if c.isalnum() or c in ['-']) |
|
|
|
|
|
|
|
|
if (2 <= len(clean_word) <= 6 and |
|
|
clean_word.isupper() and |
|
|
clean_word not in excluded_words and |
|
|
not clean_word.isdigit() and |
|
|
clean_word.isalpha()): |
|
|
|
|
|
|
|
|
|
|
|
if not any(pattern in clean_word.lower() for pattern in ['the', 'and', 'for', 'are', 'but']): |
|
|
return clean_word |
|
|
|
|
|
return None |
|
|
|
|
|
def _enhance_ticker_detection_with_context(self, query: str) -> str: |
|
|
"""Enhanced ticker detection using context clues and patterns.""" |
|
|
|
|
|
|
|
|
ticker = self._detect_ticker_symbol(query) |
|
|
if ticker: |
|
|
return ticker |
|
|
|
|
|
|
|
|
query_lower = query.lower() |
|
|
|
|
|
|
|
|
company_phrases = [ |
|
|
r"(?:stock\s+price\s+of\s+|price\s+of\s+|quote\s+for\s+)(\w+)", |
|
|
r"(\w+)(?:\s+stock|\s+share|\s+price|\s+quote)", |
|
|
r"how\s+much\s+is\s+(\w+)\s+(?:worth|trading)", |
|
|
r"what(?:'s|\s+is)\s+(\w+)\s+(?:trading\s+at|worth|price)" |
|
|
] |
|
|
|
|
|
for pattern in company_phrases: |
|
|
matches = re.findall(pattern, query_lower) |
|
|
for match in matches: |
|
|
company_name = match.strip().upper() |
|
|
|
|
|
if 2 <= len(company_name) <= 6 and company_name.isalpha(): |
|
|
return company_name |
|
|
|
|
|
return None |
|
|
|
|
|
def _get_live_financial_data(self, query: str) -> List[Dict[str, Any]]: |
|
|
"""Get live financial data for detected ticker using enhanced detection.""" |
|
|
|
|
|
ticker = self._enhance_ticker_detection_with_context(query) |
|
|
|
|
|
|
|
|
if not ticker: |
|
|
ticker = self._detect_ticker_symbol(query) |
|
|
|
|
|
if not ticker: |
|
|
print("β No ticker symbol detected in query") |
|
|
return None |
|
|
|
|
|
print(f"π― Detected ticker: {ticker}") |
|
|
|
|
|
|
|
|
data_sources = [ |
|
|
self._get_yahoo_finance_data, |
|
|
self._get_alphavantage_data, |
|
|
self._get_financial_summary_data |
|
|
] |
|
|
|
|
|
for source in data_sources: |
|
|
try: |
|
|
print(f"π Trying {source.__name__} for {ticker}...") |
|
|
data = source(ticker) |
|
|
if data: |
|
|
print(f"β
Successfully got data from {source.__name__}") |
|
|
return [data] |
|
|
else: |
|
|
print(f"β οΈ No data from {source.__name__}") |
|
|
except Exception as e: |
|
|
print(f"β Failed to get data from {source.__name__}: {e}") |
|
|
continue |
|
|
|
|
|
print(f"β All financial data sources failed for ticker: {ticker}") |
|
|
return None |
|
|
|
|
|
def _get_yahoo_finance_data(self, ticker: str) -> Dict[str, Any]: |
|
|
"""Get live data from Yahoo Finance API-like endpoint.""" |
|
|
try: |
|
|
|
|
|
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}" |
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
if 'chart' in data and 'result' in data['chart'] and data['chart']['result']: |
|
|
result = data['chart']['result'][0] |
|
|
meta = result.get('meta', {}) |
|
|
|
|
|
current_price = meta.get('regularMarketPrice', 0) |
|
|
previous_close = meta.get('previousClose', 0) |
|
|
change = current_price - previous_close if current_price and previous_close else 0 |
|
|
change_percent = (change / previous_close * 100) if previous_close else 0 |
|
|
|
|
|
company_name = meta.get('longName', ticker) |
|
|
|
|
|
snippet = f"π° Live Stock Data:\n" |
|
|
snippet += f"π’ {company_name} ({ticker})\n" |
|
|
snippet += f"π΅ Current Price: ${current_price:.2f}\n" |
|
|
snippet += f"π Change: ${change:+.2f} ({change_percent:+.2f}%)\n" |
|
|
snippet += f"π Previous Close: ${previous_close:.2f}\n" |
|
|
snippet += f"π Live data as of {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
|
return { |
|
|
'title': f'{company_name} ({ticker}) - Live Stock Quote', |
|
|
'snippet': snippet, |
|
|
'url': f'https://finance.yahoo.com/quote/{ticker}', |
|
|
'source': 'Yahoo Finance API', |
|
|
'live_data': True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Yahoo Finance API error: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _get_alphavantage_data(self, ticker: str) -> Dict[str, Any]: |
|
|
"""Try to get data from Alpha Vantage free tier.""" |
|
|
try: |
|
|
|
|
|
url = f"https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol={ticker}&apikey=demo" |
|
|
|
|
|
response = requests.get(url, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
|
|
|
if 'Global Quote' in data: |
|
|
quote = data['Global Quote'] |
|
|
price = quote.get('05. price', 'N/A') |
|
|
change = quote.get('09. change', 'N/A') |
|
|
change_percent = quote.get('10. change percent', 'N/A') |
|
|
|
|
|
snippet = f"π° Live Stock Data (Alpha Vantage):\n" |
|
|
snippet += f"π’ {ticker}\n" |
|
|
snippet += f"π΅ Price: ${price}\n" |
|
|
snippet += f"π Change: {change} ({change_percent})\n" |
|
|
snippet += f"π Live data as of {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
|
return { |
|
|
'title': f'{ticker} - Live Stock Quote', |
|
|
'snippet': snippet, |
|
|
'url': f'https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol={ticker}', |
|
|
'source': 'Alpha Vantage', |
|
|
'live_data': True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Alpha Vantage error: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _get_financial_summary_data(self, ticker: str) -> Dict[str, Any]: |
|
|
"""Get financial data by scraping investor relations or financial sites.""" |
|
|
try: |
|
|
|
|
|
urls_to_try = [ |
|
|
f"https://finance.yahoo.com/quote/{ticker}", |
|
|
f"https://www.google.com/finance/quote/{ticker}:NASDAQ", |
|
|
f"https://www.marketwatch.com/investing/stock/{ticker}", |
|
|
] |
|
|
|
|
|
for url in urls_to_try: |
|
|
try: |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15' |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=8) |
|
|
if response.status_code == 200: |
|
|
|
|
|
price_patterns = [ |
|
|
r'data-symbol="' + ticker + r'"[^>]*data-field="regularMarketPrice"[^>]*>([^<]+)', |
|
|
r'"regularMarketPrice":\s*(\d+\.?\d*)', |
|
|
r'price["\s:]*([0-9,]+\.?\d*)', |
|
|
r'\$([0-9,]+\.?\d*)', |
|
|
] |
|
|
|
|
|
for pattern in price_patterns: |
|
|
matches = re.findall(pattern, response.text, re.IGNORECASE) |
|
|
if matches: |
|
|
price = matches[0].replace(',', '') |
|
|
try: |
|
|
price_float = float(price) |
|
|
if 0.01 <= price_float <= 10000: |
|
|
snippet = f"π° Live Stock Data:\n" |
|
|
snippet += f"π’ {ticker}\n" |
|
|
snippet += f"π΅ Current Price: ${price_float:.2f}\n" |
|
|
snippet += f"π Source: {url}\n" |
|
|
snippet += f"π Retrieved: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
|
return { |
|
|
'title': f'{ticker} - Live Stock Price', |
|
|
'snippet': snippet, |
|
|
'url': url, |
|
|
'source': 'Live Financial Data', |
|
|
'live_data': True |
|
|
} |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to get data from {url}: {e}") |
|
|
continue |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Financial summary error: {e}") |
|
|
|
|
|
return None |
|
|
|
|
|
def _get_alternative_financial_data(self, blocked_url: str, query: str) -> str: |
|
|
"""Try to get financial data when primary source is blocked.""" |
|
|
ticker = self._detect_ticker_symbol(query) |
|
|
if not ticker: |
|
|
return None |
|
|
|
|
|
|
|
|
live_data = self._get_live_financial_data(query) |
|
|
if live_data and live_data[0].get('snippet'): |
|
|
return live_data[0]['snippet'] |
|
|
|
|
|
return None |
|
|
|
|
|
def _scrape_url_content(self, url: str, query: str) -> str: |
|
|
"""Scrape actual content from a URL and extract relevant information.""" |
|
|
|
|
|
|
|
|
strategies = [ |
|
|
self._scrape_with_basic_headers, |
|
|
self._scrape_with_mobile_headers, |
|
|
self._scrape_with_alternative_approach |
|
|
] |
|
|
|
|
|
for strategy in strategies: |
|
|
try: |
|
|
content = strategy(url) |
|
|
if content: |
|
|
|
|
|
return self._extract_relevant_info(content, query, url) |
|
|
except Exception as e: |
|
|
print(f"Strategy {strategy.__name__} failed for {url}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
return f"Unable to access content from this source due to access restrictions. You can visit directly: {url}" |
|
|
|
|
|
def _scrape_with_basic_headers(self, url: str) -> str: |
|
|
"""Try scraping with basic browser headers.""" |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.5', |
|
|
'Accept-Encoding': 'gzip, deflate', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1', |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
return self._clean_html_content(response.text) |
|
|
|
|
|
def _scrape_with_mobile_headers(self, url: str) -> str: |
|
|
"""Try scraping with mobile browser headers (sometimes less blocked).""" |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Mobile/15E148 Safari/604.1', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Accept-Encoding': 'gzip, deflate', |
|
|
'Connection': 'keep-alive', |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
return self._clean_html_content(response.text) |
|
|
|
|
|
def _scrape_with_alternative_approach(self, url: str) -> str: |
|
|
"""Try alternative scraping approach with different session.""" |
|
|
session = requests.Session() |
|
|
|
|
|
|
|
|
user_agents = [ |
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0', |
|
|
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' |
|
|
] |
|
|
|
|
|
import random |
|
|
headers = { |
|
|
'User-Agent': random.choice(user_agents), |
|
|
'Accept': 'text/html,application/xhtml+xml', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Cache-Control': 'no-cache', |
|
|
'Pragma': 'no-cache' |
|
|
} |
|
|
|
|
|
response = session.get(url, headers=headers, timeout=15, allow_redirects=True) |
|
|
response.raise_for_status() |
|
|
|
|
|
return self._clean_html_content(response.text) |
|
|
|
|
|
def _clean_html_content(self, html_content: str) -> str: |
|
|
"""Clean HTML content and extract text.""" |
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
|
|
|
for element in soup(["script", "style", "nav", "footer", "header", "aside", "iframe", "noscript"]): |
|
|
element.decompose() |
|
|
|
|
|
|
|
|
text_content = soup.get_text() |
|
|
|
|
|
|
|
|
lines = (line.strip() for line in text_content.splitlines()) |
|
|
clean_text = ' '.join(line for line in lines if line and len(line) > 3) |
|
|
|
|
|
return clean_text |
|
|
|
|
|
def _extract_relevant_info(self, text: str, query: str, url: str) -> str: |
|
|
"""Extract information relevant to any query from website text.""" |
|
|
|
|
|
|
|
|
query_lower = query.lower() |
|
|
query_words = [word.strip('?.,!') for word in query_lower.split() if len(word) > 2] |
|
|
|
|
|
|
|
|
stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'and', 'are', 'is'} |
|
|
query_words = [word for word in query_words if word not in stop_words] |
|
|
|
|
|
if not query_words: |
|
|
return "Unable to extract relevant information." |
|
|
|
|
|
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
relevant_info = [] |
|
|
|
|
|
|
|
|
scored_sentences = [] |
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if 20 <= len(sentence) <= 300: |
|
|
score = self._score_sentence_relevance(sentence, query_words) |
|
|
if score > 0: |
|
|
scored_sentences.append((score, sentence)) |
|
|
|
|
|
|
|
|
scored_sentences.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
|
|
|
extracted_data = {} |
|
|
|
|
|
|
|
|
if self._is_numerical_query(query_lower): |
|
|
extracted_data.update(self._extract_numerical_info(text, query_words)) |
|
|
|
|
|
if self._is_date_time_query(query_lower): |
|
|
extracted_data.update(self._extract_date_time_info(text, query_words)) |
|
|
|
|
|
if self._is_definition_query(query_lower): |
|
|
extracted_data.update(self._extract_definition_info(text, query_words)) |
|
|
|
|
|
if self._is_how_to_query(query_lower): |
|
|
extracted_data.update(self._extract_how_to_info(text, query_words)) |
|
|
|
|
|
|
|
|
top_sentences = [sent[1] for sent in scored_sentences[:3]] |
|
|
if top_sentences: |
|
|
extracted_data['relevant_info'] = top_sentences |
|
|
|
|
|
|
|
|
return self._format_extracted_info(extracted_data, url, query) |
|
|
|
|
|
def _score_sentence_relevance(self, sentence: str, query_words: List[str]) -> int: |
|
|
"""Score a sentence based on how relevant it is to the query.""" |
|
|
sentence_lower = sentence.lower() |
|
|
score = 0 |
|
|
|
|
|
|
|
|
for word in query_words: |
|
|
if word in sentence_lower: |
|
|
score += 3 |
|
|
|
|
|
|
|
|
word_count = sum(1 for word in query_words if word in sentence_lower) |
|
|
if word_count > 1: |
|
|
score += word_count * 2 |
|
|
|
|
|
|
|
|
answer_indicators = ['is', 'are', 'was', 'were', 'can', 'will', 'has', 'have', 'according to', 'known as'] |
|
|
if any(indicator in sentence_lower for indicator in answer_indicators): |
|
|
score += 2 |
|
|
|
|
|
|
|
|
if len(sentence) > 200: |
|
|
score -= 1 |
|
|
|
|
|
return score |
|
|
|
|
|
def _is_numerical_query(self, query: str) -> bool: |
|
|
"""Check if query is asking for numerical information.""" |
|
|
numerical_keywords = ['price', 'cost', 'number', 'amount', 'count', 'total', 'rate', 'percentage', 'how much', 'how many'] |
|
|
return any(keyword in query for keyword in numerical_keywords) |
|
|
|
|
|
def _is_date_time_query(self, query: str) -> bool: |
|
|
"""Check if query is asking for date/time information.""" |
|
|
time_keywords = ['when', 'date', 'time', 'year', 'month', 'day', 'ago', 'since', 'until', 'before', 'after'] |
|
|
return any(keyword in query for keyword in time_keywords) |
|
|
|
|
|
def _is_definition_query(self, query: str) -> bool: |
|
|
"""Check if query is asking for a definition.""" |
|
|
definition_keywords = ['what is', 'what are', 'define', 'definition', 'meaning', 'means'] |
|
|
return any(keyword in query for keyword in definition_keywords) |
|
|
|
|
|
def _is_how_to_query(self, query: str) -> bool: |
|
|
"""Check if query is asking for instructions.""" |
|
|
how_to_keywords = ['how to', 'how do', 'how can', 'steps', 'instructions', 'guide', 'tutorial'] |
|
|
return any(keyword in query for keyword in how_to_keywords) |
|
|
|
|
|
def _extract_numerical_info(self, text: str, query_words: List[str]) -> Dict[str, Any]: |
|
|
"""Extract numerical information from text.""" |
|
|
numerical_info = {} |
|
|
|
|
|
|
|
|
patterns = [ |
|
|
r'\$(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)', |
|
|
r'(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)%', |
|
|
r'(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)\s*(million|billion|trillion)', |
|
|
r'(\d{1,4}(?:,\d{3})*(?:\.\d{1,2})?)', |
|
|
] |
|
|
|
|
|
found_numbers = [] |
|
|
for pattern in patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
if isinstance(match, tuple): |
|
|
found_numbers.append(' '.join(match)) |
|
|
else: |
|
|
found_numbers.append(match) |
|
|
|
|
|
if found_numbers: |
|
|
numerical_info['numbers'] = found_numbers[:5] |
|
|
|
|
|
return numerical_info |
|
|
|
|
|
def _extract_date_time_info(self, text: str, query_words: List[str]) -> Dict[str, Any]: |
|
|
"""Extract date and time information from text.""" |
|
|
date_info = {} |
|
|
|
|
|
|
|
|
date_patterns = [ |
|
|
r'\b(\d{1,2}\/\d{1,2}\/\d{4})\b', |
|
|
r'\b(\d{4}-\d{1,2}-\d{1,2})\b', |
|
|
r'\b(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b', |
|
|
r'\b(\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})\b', |
|
|
] |
|
|
|
|
|
found_dates = [] |
|
|
for pattern in date_patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
found_dates.extend(matches) |
|
|
|
|
|
if found_dates: |
|
|
date_info['dates'] = found_dates[:3] |
|
|
|
|
|
return date_info |
|
|
|
|
|
def _extract_definition_info(self, text: str, query_words: List[str]) -> Dict[str, Any]: |
|
|
"""Extract definition information from text.""" |
|
|
definition_info = {} |
|
|
|
|
|
|
|
|
for word in query_words: |
|
|
definition_patterns = [ |
|
|
f"{word} is (.*?)(?:\.|$)", |
|
|
f"{word} are (.*?)(?:\.|$)", |
|
|
f"{word} refers to (.*?)(?:\.|$)", |
|
|
f"{word} means (.*?)(?:\.|$)", |
|
|
] |
|
|
|
|
|
for pattern in definition_patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL) |
|
|
if matches: |
|
|
definition_info['definition'] = matches[0].strip()[:200] |
|
|
break |
|
|
|
|
|
if 'definition' in definition_info: |
|
|
break |
|
|
|
|
|
return definition_info |
|
|
|
|
|
def _extract_how_to_info(self, text: str, query_words: List[str]) -> Dict[str, Any]: |
|
|
"""Extract how-to/instructional information from text.""" |
|
|
how_to_info = {} |
|
|
|
|
|
|
|
|
step_patterns = [ |
|
|
r'(step \d+[:\.].*?)(?=step \d+|$)', |
|
|
r'(\d+\.\s+.*?)(?=\d+\.|$)', |
|
|
r'(first.*?)(?=second|then|next|$)', |
|
|
r'(then.*?)(?=then|next|finally|$)', |
|
|
] |
|
|
|
|
|
steps = [] |
|
|
for pattern in step_patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL) |
|
|
steps.extend([step.strip()[:150] for step in matches]) |
|
|
|
|
|
if steps: |
|
|
how_to_info['steps'] = steps[:5] |
|
|
|
|
|
return how_to_info |
|
|
|
|
|
def _format_extracted_info(self, extracted_data: Dict[str, Any], url: str, query: str) -> str: |
|
|
"""Format the extracted information into a readable response.""" |
|
|
if not extracted_data: |
|
|
return "Unable to extract specific information from this source." |
|
|
|
|
|
source_name = urlparse(url).netloc.replace('www.', '') |
|
|
response_parts = [f"π Live data from {source_name}:"] |
|
|
|
|
|
|
|
|
if 'definition' in extracted_data: |
|
|
response_parts.append(f"π‘ {extracted_data['definition']}") |
|
|
|
|
|
|
|
|
if 'numbers' in extracted_data: |
|
|
numbers_text = ", ".join(extracted_data['numbers'][:3]) |
|
|
response_parts.append(f"π’ Key numbers: {numbers_text}") |
|
|
|
|
|
|
|
|
if 'dates' in extracted_data: |
|
|
dates_text = ", ".join(str(date) for date in extracted_data['dates'][:2]) |
|
|
response_parts.append(f"π
Dates: {dates_text}") |
|
|
|
|
|
|
|
|
if 'steps' in extracted_data: |
|
|
steps_text = " | ".join(extracted_data['steps'][:2]) |
|
|
response_parts.append(f"π Steps: {steps_text}") |
|
|
|
|
|
|
|
|
if 'relevant_info' in extracted_data: |
|
|
for i, info in enumerate(extracted_data['relevant_info'][:2], 1): |
|
|
response_parts.append(f"βΉοΈ {info}") |
|
|
|
|
|
return "\n".join(response_parts) |
|
|
|
|
|
def _extract_stock_info(self, text: str, url: str) -> str: |
|
|
"""Extract stock price and related information from website text.""" |
|
|
|
|
|
|
|
|
price_patterns = [ |
|
|
r'\$(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)', |
|
|
r'(\d{1,4}(?:,\d{3})*\.\d{2})\s*USD', |
|
|
r'Price[:\s]*\$?(\d{1,4}(?:,\d{3})*(?:\.\d{2})?)', |
|
|
r'(\d{1,4}(?:,\d{3})*\.\d{2})', |
|
|
] |
|
|
|
|
|
|
|
|
change_patterns = [ |
|
|
r'([\+\-]\$?\d+(?:\.\d{2})?)\s*\(([\+\-]?\d+(?:\.\d{2})?\%?)\)', |
|
|
r'([\+\-]\d+(?:\.\d{2})?\%)', |
|
|
r'(up|down)\s+(\d+(?:\.\d{2})?\%?)', |
|
|
] |
|
|
|
|
|
extracted_info = [] |
|
|
|
|
|
|
|
|
found_prices = [] |
|
|
for pattern in price_patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
try: |
|
|
if isinstance(match, tuple): |
|
|
price_str = match[0] if match[0] else match[1] |
|
|
else: |
|
|
price_str = match |
|
|
|
|
|
clean_price = price_str.replace(',', '').replace('$', '') |
|
|
price_val = float(clean_price) |
|
|
|
|
|
|
|
|
if 0.01 <= price_val <= 10000: |
|
|
found_prices.append(f"${price_str}") |
|
|
except: |
|
|
continue |
|
|
|
|
|
if found_prices: |
|
|
|
|
|
extracted_info.append(f"π° Current Price: {found_prices[0]}") |
|
|
|
|
|
|
|
|
for pattern in change_patterns: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
if matches: |
|
|
match = matches[0] |
|
|
if isinstance(match, tuple) and len(match) == 2: |
|
|
if match[0].lower() in ['up', 'down']: |
|
|
change_text = f"π Change: {match[0]} {match[1]}" |
|
|
else: |
|
|
change_text = f"π Change: {match[0]} ({match[1]})" |
|
|
else: |
|
|
change_text = f"π Change: {match}" |
|
|
extracted_info.append(change_text) |
|
|
break |
|
|
|
|
|
|
|
|
sentences = text.split('.') |
|
|
for sentence in sentences[:10]: |
|
|
if any(word in sentence.lower() for word in ['nvidia', 'nvda', 'stock', 'share']): |
|
|
clean_sentence = sentence.strip() |
|
|
if 20 < len(clean_sentence) < 200: |
|
|
extracted_info.append(f"βΉοΈ {clean_sentence}") |
|
|
break |
|
|
|
|
|
if extracted_info: |
|
|
source_name = urlparse(url).netloc.replace('www.', '') |
|
|
return f"π Live data from {source_name}:\n" + "\n".join(extracted_info) |
|
|
|
|
|
return "Unable to extract specific stock data from this source." |
|
|
|
|
|
def _extract_general_info(self, text: str, query: str) -> str: |
|
|
"""Extract general information relevant to the query.""" |
|
|
|
|
|
query_words = query.lower().split() |
|
|
relevant_sentences = [] |
|
|
|
|
|
sentences = text.split('.') |
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if (len(sentence) > 30 and |
|
|
any(word in sentence.lower() for word in query_words) and |
|
|
len(relevant_sentences) < 3): |
|
|
relevant_sentences.append(sentence) |
|
|
|
|
|
if relevant_sentences: |
|
|
return " ".join(relevant_sentences[:2]) |
|
|
|
|
|
return "Relevant information found but unable to extract specific details." |
|
|
|
|
|
def _clean_url(self, url: str) -> str: |
|
|
"""Clean DuckDuckGo redirect URLs.""" |
|
|
if url.startswith('//duckduckgo.com/l/?uddg='): |
|
|
try: |
|
|
from urllib.parse import unquote |
|
|
encoded = url.replace('//duckduckgo.com/l/?uddg=', '').split('&')[0] |
|
|
return unquote(encoded) |
|
|
except: |
|
|
pass |
|
|
return url |
|
|
|
|
|
def _get_source_name(self, url: str) -> str: |
|
|
"""Extract readable source name from URL.""" |
|
|
try: |
|
|
domain = urlparse(url).netloc.replace('www.', '') |
|
|
|
|
|
if 'wikipedia' in domain: |
|
|
return 'Wikipedia' |
|
|
elif 'github' in domain: |
|
|
return 'GitHub' |
|
|
elif 'stackoverflow' in domain: |
|
|
return 'Stack Overflow' |
|
|
elif 'reddit' in domain: |
|
|
return 'Reddit' |
|
|
elif 'youtube' in domain: |
|
|
return 'YouTube' |
|
|
else: |
|
|
return domain.title() |
|
|
except: |
|
|
return 'Web Source' |
|
|
|
|
|
def _generate_summary(self, query: str, results: List[Dict[str, Any]]) -> str: |
|
|
"""Generate formatted summary with results and sources.""" |
|
|
if not results: |
|
|
return f"# π No Results Found\n\nNo results found for: *{query}*\n\nTry rephrasing your search query." |
|
|
|
|
|
parts = [f"# π Search Results for: *{query}*", ""] |
|
|
|
|
|
|
|
|
for i, result in enumerate(results, 1): |
|
|
title = result.get('title', 'Unknown') |
|
|
url = result.get('url', '#') |
|
|
source = result.get('source', 'Web') |
|
|
snippet = result.get('snippet', '') |
|
|
|
|
|
parts.append(f"## {i}. {title}") |
|
|
|
|
|
if snippet: |
|
|
parts.append(f"{snippet}") |
|
|
parts.append("") |
|
|
|
|
|
parts.append(f"**Source:** [{source}]({url})") |
|
|
parts.append("---") |
|
|
|
|
|
|
|
|
parts.append(f"*Found {len(results)} results β’ Real-time web search*") |
|
|
|
|
|
return "\n".join(parts) |