""" Real Web Search Tools - Updated for duckduckgo-search v8.1.1 (2025) HASHIRU 6.1 - Internet access with real search capabilities """ import asyncio import json import httpx from typing import Dict, Any, List, Optional from datetime import datetime from pathlib import Path # Import DuckDuckGo search (latest version) try: from duckduckgo_search import DDGS DDG_AVAILABLE = True print("✅ DuckDuckGo search library v8.1.1+ loaded") except ImportError: DDG_AVAILABLE = False print("⚠️ DuckDuckGo search not available. Install: pip install duckduckgo-search") async def handle_search(query: str, max_results: int = 10) -> Dict[str, Any]: """ Real internet search using DuckDuckGo (v8.1.1+ API) Usage: /search [max_results] """ if not query.strip(): return {"error": "Search query cannot be empty"} if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed. Run: pip install duckduckgo-search"} try: # Use context manager for DDGS (recommended in v8+) with DDGS() as ddgs: # Perform search - new API doesn't use max_results in function call search_results = ddgs.text( keywords=query, region="us-en", safesearch="moderate" ) # Limit results manually (new API behavior) results = [] for i, result in enumerate(search_results): if i >= max_results: break results.append({ "position": i + 1, "title": result.get("title", ""), "url": result.get("href", ""), "snippet": result.get("body", ""), "domain": result.get("href", "").split('/')[2] if result.get("href") else "" }) # Save search results to free path search_data = { "query": query, "timestamp": datetime.now().isoformat(), "total_results": len(results), "results": results, "api_version": "duckduckgo-search v8.1.1+" } # Save to free project folder free_path = Path(r"C:\meu_projeto_livre") free_path.mkdir(exist_ok=True) search_file = free_path / f"search_{query.replace(' ', '_')[:20]}_{int(datetime.now().timestamp())}.json" with open(search_file, 'w', encoding='utf-8') as f: json.dump(search_data, f, indent=2, ensure_ascii=False) return { "success": True, "query": query, "results": results, "total_found": len(results), "search_engine": "DuckDuckGo v8.1.1+", "saved_to": str(search_file), "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Search failed: {str(e)}"} async def handle_news(topic: str = "technology", max_results: int = 5) -> Dict[str, Any]: """ Search for news using updated API Usage: /news [topic] [max_results] """ if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed"} try: with DDGS() as ddgs: # News search with new API news_results = ddgs.news( keywords=topic, region="us-en", safesearch="moderate" ) articles = [] for i, article in enumerate(news_results): if i >= max_results: break articles.append({ "position": i + 1, "title": article.get("title", ""), "url": article.get("url", ""), "source": article.get("source", ""), "published": article.get("date", ""), "snippet": article.get("body", "") }) # Save news to free path news_data = { "topic": topic, "timestamp": datetime.now().isoformat(), "articles": articles, "total_articles": len(articles), "api_version": "duckduckgo-search v8.1.1+" } free_path = Path(r"C:\meu_projeto_livre") news_file = free_path / f"news_{topic.replace(' ', '_')[:15]}_{int(datetime.now().timestamp())}.json" with open(news_file, 'w', encoding='utf-8') as f: json.dump(news_data, f, indent=2, ensure_ascii=False) return { "success": True, "topic": topic, "articles": articles, "total_found": len(articles), "saved_to": str(news_file), "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"News search failed: {str(e)}"} async def handle_images(query: str, max_results: int = 10) -> Dict[str, Any]: """ Search for images using updated API Usage: /images [max_results] """ if not query.strip(): return {"error": "Image search query cannot be empty"} if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed"} try: with DDGS() as ddgs: # Image search with new API image_results = ddgs.images( keywords=query, region="us-en", safesearch="moderate" ) images = [] for i, img in enumerate(image_results): if i >= max_results: break images.append({ "position": i + 1, "title": img.get("title", ""), "image_url": img.get("image", ""), "thumbnail": img.get("thumbnail", ""), "source_url": img.get("url", ""), "width": img.get("width", 0), "height": img.get("height", 0) }) return { "success": True, "query": query, "images": images, "total_found": len(images), "search_engine": "DuckDuckGo Images v8.1.1+", "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Image search failed: {str(e)}"} async def handle_videos(query: str, max_results: int = 10) -> Dict[str, Any]: """ Search for videos using updated API Usage: /videos [max_results] """ if not query.strip(): return {"error": "Video search query cannot be empty"} if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed"} try: with DDGS() as ddgs: # Video search with new API video_results = ddgs.videos( keywords=query, region="us-en", safesearch="moderate" ) videos = [] for i, video in enumerate(video_results): if i >= max_results: break videos.append({ "position": i + 1, "title": video.get("title", ""), "video_url": video.get("content", ""), "thumbnail": video.get("image", ""), "duration": video.get("duration", ""), "published": video.get("published", ""), "publisher": video.get("publisher", "") }) return { "success": True, "query": query, "videos": videos, "total_found": len(videos), "search_engine": "DuckDuckGo Videos v8.1.1+", "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Video search failed: {str(e)}"} async def handle_instant_answer(query: str) -> Dict[str, Any]: """ Get instant answers/facts from DuckDuckGo Usage: /instant """ if not query.strip(): return {"error": "Query cannot be empty"} if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed"} try: with DDGS() as ddgs: # Try to get instant answer answer_results = ddgs.answers(query) answers = [] for answer in answer_results: answers.append({ "text": answer.get("text", ""), "url": answer.get("url", ""), "source": answer.get("source", "") }) break # Usually just one answer if not answers: return {"error": "No instant answer found", "query": query} return { "success": True, "query": query, "instant_answer": answers[0], "search_engine": "DuckDuckGo Instant Answers", "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Instant answer failed: {str(e)}"} async def handle_browse(url: str) -> Dict[str, Any]: """ Fetch content from a specific URL Usage: /browse """ if not url.strip(): return {"error": "URL cannot be empty"} # Add https:// if no protocol if not url.startswith(('http://', 'https://')): url = f"https://{url}" try: headers = { "User-Agent": "HASHIRU-6.1-Agent", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", } async with httpx.AsyncClient(timeout=30.0, headers=headers) as client: response = await client.get(url) response.raise_for_status() content = response.text content_preview = content[:2000] + "..." if len(content) > 2000 else content # Save content to free path free_path = Path(r"C:\meu_projeto_livre") domain = url.split('/')[2] if '/' in url else url.replace(':', '_') content_file = free_path / f"browsed_{domain}_{int(datetime.now().timestamp())}.html" with open(content_file, 'w', encoding='utf-8') as f: f.write(content) return { "success": True, "url": url, "status_code": response.status_code, "content_length": len(content), "content_preview": content_preview, "saved_to": str(content_file), "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Failed to browse URL: {str(e)}"} async def handle_research(topic: str, depth: str = "basic") -> Dict[str, Any]: """ Comprehensive research using multiple search types Usage: /research [basic|detailed] """ if not topic.strip(): return {"error": "Research topic cannot be empty"} if not DDG_AVAILABLE: return {"error": "DuckDuckGo search library not installed"} try: research_data = { "topic": topic, "depth": depth, "timestamp": datetime.now().isoformat(), "web_results": [], "news_results": [], "instant_answers": [], "api_version": "duckduckgo-search v8.1.1+" } with DDGS() as ddgs: # 1. Web search web_results = ddgs.text(keywords=topic, region="us-en", safesearch="moderate") for i, result in enumerate(web_results): if i >= 5: # Limit for research break research_data["web_results"].append({ "title": result.get("title", ""), "url": result.get("href", ""), "snippet": result.get("body", "") }) # 2. News search try: news_results = ddgs.news(keywords=topic, region="us-en", safesearch="moderate") for i, news in enumerate(news_results): if i >= 3: # Limit for research break research_data["news_results"].append({ "title": news.get("title", ""), "url": news.get("url", ""), "source": news.get("source", ""), "date": news.get("date", "") }) except: pass # News might not be available for all topics # 3. Try instant answers try: answers = ddgs.answers(topic) for answer in answers: research_data["instant_answers"].append({ "text": answer.get("text", ""), "url": answer.get("url", ""), "source": answer.get("source", "") }) break # Usually just one answer except: pass # Instant answers might not be available # Save comprehensive research free_path = Path(r"C:\meu_projeto_livre") research_file = free_path / f"research_{topic.replace(' ', '_')[:20]}_{int(datetime.now().timestamp())}.json" with open(research_file, 'w', encoding='utf-8') as f: json.dump(research_data, f, indent=2, ensure_ascii=False) # Generate summary total_sources = len(research_data["web_results"]) + len(research_data["news_results"]) has_instant = len(research_data["instant_answers"]) > 0 return { "success": True, "topic": topic, "depth": depth, "total_sources": total_sources, "has_instant_answer": has_instant, "saved_to": str(research_file), "summary": f"Research complete: {total_sources} sources found for '{topic}'", "timestamp": datetime.now().isoformat() } except Exception as e: return {"error": f"Research failed: {str(e)}"} # Updated command registry WEB_COMMANDS = { "/search": handle_search, "/news": handle_news, "/images": handle_images, "/videos": handle_videos, "/instant": handle_instant_answer, "/browse": handle_browse, "/research": handle_research, } # Register commands function def register_web_commands(): """Register web commands in the main system""" try: from tools.registry import register_handler for command, handler in WEB_COMMANDS.items(): register_handler(command, handler) print("✅ Real web search commands registered (v8.1.1+):") for cmd in WEB_COMMANDS.keys(): print(f" {cmd}") if DDG_AVAILABLE: print("🌐 DuckDuckGo search engine ready (latest API)!") else: print("⚠️ Install duckduckgo-search for full functionality") except Exception as e: print(f"⚠️ Error registering web commands: {e}") # Auto-register on import register_web_commands()