Spaces:
Sleeping
Sleeping
hugh2023
Add multi-modal agent system with media analysis, web scraping, and enhanced configuration management
adec1cb | """ | |
| 多模态智能体工具模块 | |
| """ | |
| import os | |
| import json | |
| import requests | |
| import tempfile | |
| import ast | |
| import subprocess | |
| import sys | |
| from typing import Dict, List, Any, Optional | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import torch | |
| from transformers import pipeline | |
| from langchain_core.tools import tool | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from config import Config | |
| # PDF处理相关导入 | |
| try: | |
| import PyPDF2 | |
| import fitz # PyMuPDF | |
| from pdf2image import convert_from_path | |
| PDF_AVAILABLE = True | |
| except ImportError: | |
| PDF_AVAILABLE = False | |
| print("⚠️ PDF处理功能需要安装: pip install PyPDF2 PyMuPDF pdf2image") | |
| # 网页处理相关导入 | |
| try: | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import urllib.parse | |
| from urllib.parse import urljoin, urlparse | |
| import re | |
| import time | |
| WEB_AVAILABLE = True | |
| except ImportError: | |
| WEB_AVAILABLE = False | |
| print("⚠️ 网页处理功能需要安装: pip install beautifulsoup4 requests") | |
| # YouTube处理相关导入 | |
| try: | |
| from pytube import YouTube | |
| YOUTUBE_AVAILABLE = True | |
| YT_DLP_AVAILABLE = False | |
| try: | |
| import yt_dlp | |
| YT_DLP_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| except ImportError: | |
| YOUTUBE_AVAILABLE = False | |
| YT_DLP_AVAILABLE = False | |
| print("⚠️ YouTube处理功能需要安装: pip install pytube") | |
| # 音频处理相关导入 | |
| try: | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| AUDIO_PROCESSING_AVAILABLE = True | |
| except ImportError: | |
| AUDIO_PROCESSING_AVAILABLE = False | |
| print("⚠️ 音频处理功能需要安装: pip install SpeechRecognition pydub") | |
| # Wikipedia处理相关导入 | |
| try: | |
| import wikipediaapi | |
| import requests | |
| from bs4 import BeautifulSoup | |
| WIKIPEDIA_AVAILABLE = True | |
| except ImportError: | |
| WIKIPEDIA_AVAILABLE = False | |
| print("⚠️ Wikipedia处理功能需要安装: pip install wikipedia-api requests beautifulsoup4") | |
| class WebTools: | |
| """网页内容分析工具类""" | |
| def fetch_webpage_content(url: str) -> Dict[str, Any]: | |
| """获取网页内容""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return {"error": "网页处理功能未安装,请运行: pip install beautifulsoup4 requests"} | |
| # 设置请求头,模拟浏览器 | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| # 发送请求 | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| # 解析HTML | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # 提取基本信息 | |
| title = soup.find('title') | |
| title_text = title.get_text().strip() if title else "无标题" | |
| # 提取主要文本内容 | |
| # 移除脚本和样式标签 | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| # 获取文本内容 | |
| text_content = soup.get_text() | |
| lines = (line.strip() for line in text_content.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text_content = ' '.join(chunk for chunk in chunks if chunk) | |
| # 提取链接 | |
| links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link.get('href') | |
| text = link.get_text().strip() | |
| if href and text: | |
| full_url = urljoin(url, href) | |
| links.append({ | |
| 'url': full_url, | |
| 'text': text[:100] # 限制文本长度 | |
| }) | |
| # 提取图片 | |
| images = [] | |
| for img in soup.find_all('img', src=True): | |
| src = img.get('src') | |
| alt = img.get('alt', '') | |
| if src: | |
| full_url = urljoin(url, src) | |
| images.append({ | |
| 'url': full_url, | |
| 'alt': alt[:100] | |
| }) | |
| # 提取元数据 | |
| meta_data = {} | |
| for meta in soup.find_all('meta'): | |
| name = meta.get('name') or meta.get('property') | |
| content = meta.get('content') | |
| if name and content: | |
| meta_data[name] = content | |
| return { | |
| 'url': url, | |
| 'title': title_text, | |
| 'text_content': text_content[:5000], # 限制文本长度 | |
| 'links_count': len(links), | |
| 'images_count': len(images), | |
| 'links': links[:20], # 限制链接数量 | |
| 'images': images[:10], # 限制图片数量 | |
| 'meta_data': meta_data, | |
| 'status_code': response.status_code, | |
| 'content_type': response.headers.get('content-type', ''), | |
| 'encoding': response.encoding | |
| } | |
| except Exception as e: | |
| return {"error": f"网页内容获取失败: {str(e)}"} | |
| def extract_text_from_webpage(url: str) -> str: | |
| """从网页中提取纯文本内容""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return "网页处理功能未安装,请运行: pip install beautifulsoup4 requests" | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # 移除不需要的标签 | |
| for tag in soup(['script', 'style', 'nav', 'footer', 'header']): | |
| tag.decompose() | |
| # 提取文本 | |
| text = soup.get_text() | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| return text if text.strip() else "网页中没有找到文本内容" | |
| except Exception as e: | |
| return f"文本提取失败: {str(e)}" | |
| def analyze_webpage_structure(url: str) -> Dict[str, Any]: | |
| """分析网页结构""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return {"error": "网页处理功能未安装,请运行: pip install beautifulsoup4 requests"} | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # 分析页面结构 | |
| structure = { | |
| 'url': url, | |
| 'title': soup.find('title').get_text().strip() if soup.find('title') else "无标题", | |
| 'headings': {}, | |
| 'sections': [], | |
| 'forms': [], | |
| 'tables': [], | |
| 'lists': [] | |
| } | |
| # 分析标题层级 | |
| for i in range(1, 7): | |
| headings = soup.find_all(f'h{i}') | |
| structure['headings'][f'h{i}'] = len(headings) | |
| # 分析主要区域 | |
| main_sections = soup.find_all(['main', 'article', 'section', 'div'], class_=re.compile(r'main|content|article|post')) | |
| for section in main_sections[:5]: # 限制数量 | |
| section_text = section.get_text().strip()[:200] | |
| structure['sections'].append({ | |
| 'tag': section.name, | |
| 'class': section.get('class', []), | |
| 'text_preview': section_text | |
| }) | |
| # 分析表单 | |
| forms = soup.find_all('form') | |
| for form in forms[:3]: | |
| inputs = form.find_all('input') | |
| structure['forms'].append({ | |
| 'action': form.get('action', ''), | |
| 'method': form.get('method', ''), | |
| 'input_count': len(inputs) | |
| }) | |
| # 分析表格 | |
| tables = soup.find_all('table') | |
| for table in tables[:3]: | |
| rows = table.find_all('tr') | |
| structure['tables'].append({ | |
| 'row_count': len(rows), | |
| 'has_header': bool(table.find('th')) | |
| }) | |
| # 分析列表 | |
| lists = soup.find_all(['ul', 'ol']) | |
| for lst in lists[:5]: | |
| items = lst.find_all('li') | |
| structure['lists'].append({ | |
| 'type': lst.name, | |
| 'item_count': len(items) | |
| }) | |
| return structure | |
| except Exception as e: | |
| return {"error": f"网页结构分析失败: {str(e)}"} | |
| def search_content_in_webpage(url: str, search_term: str) -> List[Dict[str, Any]]: | |
| """在网页中搜索特定内容""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return [{"error": "网页处理功能未安装,请运行: pip install beautifulsoup4 requests"}] | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # 移除脚本和样式 | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| text = soup.get_text() | |
| # 搜索匹配项 | |
| search_results = [] | |
| lines = text.split('\n') | |
| for i, line in enumerate(lines): | |
| if search_term.lower() in line.lower(): | |
| # 获取上下文 | |
| start = max(0, i - 1) | |
| end = min(len(lines), i + 2) | |
| context = '\n'.join(lines[start:end]) | |
| search_results.append({ | |
| 'line_number': i + 1, | |
| 'matched_text': line.strip(), | |
| 'context': context.strip() | |
| }) | |
| if len(search_results) >= 10: # 限制结果数量 | |
| break | |
| return search_results | |
| except Exception as e: | |
| return [{"error": f"网页内容搜索失败: {str(e)}"}] | |
| def extract_links_from_webpage(url: str) -> List[Dict[str, str]]: | |
| """从网页中提取所有链接""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return [{"error": "网页处理功能未安装,请运行: pip install beautifulsoup4 requests"}] | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| links = [] | |
| for link in soup.find_all('a', href=True): | |
| href = link.get('href') | |
| text = link.get_text().strip() | |
| if href and text: | |
| full_url = urljoin(url, href) | |
| parsed_url = urlparse(full_url) | |
| links.append({ | |
| 'url': full_url, | |
| 'text': text[:100], | |
| 'domain': parsed_url.netloc, | |
| 'path': parsed_url.path | |
| }) | |
| return links[:50] # 限制链接数量 | |
| except Exception as e: | |
| return [{"error": f"链接提取失败: {str(e)}"}] | |
| def summarize_webpage_content(url: str) -> str: | |
| """总结网页内容""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return "网页处理功能未安装,请运行: pip install beautifulsoup4 requests" | |
| # 获取网页内容 | |
| content_result = WebTools.fetch_webpage_content(url) | |
| if "error" in content_result: | |
| return content_result["error"] | |
| # 提取文本内容 | |
| text_content = content_result.get('text_content', '') | |
| if not text_content: | |
| return "网页中没有找到可总结的内容" | |
| # 使用LLM总结内容 | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage | |
| llm = ChatOpenAI( | |
| model=Config.OPENAI_MODEL, | |
| temperature=0.3, | |
| api_key=Config.OPENAI_API_KEY | |
| ) | |
| # 如果文本太长,分段处理 | |
| if len(text_content) > 4000: | |
| text_content = text_content[:4000] + "..." | |
| prompt = f""" | |
| 请总结以下网页的主要内容: | |
| 标题: {content_result.get('title', '无标题')} | |
| URL: {url} | |
| 内容: | |
| {text_content} | |
| 请提供: | |
| 1. 网页的主要主题 | |
| 2. 关键信息点 | |
| 3. 重要内容摘要 | |
| 4. 网页类型和用途 | |
| """ | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| return response.content | |
| except Exception as e: | |
| return f"网页内容总结失败: {str(e)}" | |
| def check_webpage_accessibility(url: str) -> Dict[str, Any]: | |
| """检查网页的可访问性""" | |
| try: | |
| if not WEB_AVAILABLE: | |
| return {"error": "网页处理功能未安装,请运行: pip install beautifulsoup4 requests"} | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| accessibility_report = { | |
| 'url': url, | |
| 'status_code': response.status_code, | |
| 'load_time': response.elapsed.total_seconds(), | |
| 'issues': [], | |
| 'recommendations': [] | |
| } | |
| # 检查标题 | |
| title = soup.find('title') | |
| if not title or not title.get_text().strip(): | |
| accessibility_report['issues'].append("缺少页面标题") | |
| accessibility_report['recommendations'].append("添加有意义的页面标题") | |
| # 检查图片alt属性 | |
| images = soup.find_all('img') | |
| images_without_alt = [img for img in images if not img.get('alt')] | |
| if images_without_alt: | |
| accessibility_report['issues'].append(f"发现 {len(images_without_alt)} 张图片缺少alt属性") | |
| accessibility_report['recommendations'].append("为所有图片添加alt属性") | |
| # 检查链接文本 | |
| links = soup.find_all('a', href=True) | |
| empty_links = [link for link in links if not link.get_text().strip()] | |
| if empty_links: | |
| accessibility_report['issues'].append(f"发现 {len(empty_links)} 个空链接") | |
| accessibility_report['recommendations'].append("为所有链接添加描述性文本") | |
| # 检查表单标签 | |
| forms = soup.find_all('form') | |
| for form in forms: | |
| inputs = form.find_all('input') | |
| for input_field in inputs: | |
| if input_field.get('type') in ['text', 'email', 'password']: | |
| if not input_field.get('id') or not soup.find('label', {'for': input_field.get('id')}): | |
| accessibility_report['issues'].append("表单输入字段缺少标签") | |
| accessibility_report['recommendations'].append("为表单字段添加label标签") | |
| break | |
| # 检查颜色对比度(简化版) | |
| style_tags = soup.find_all('style') | |
| if not style_tags: | |
| accessibility_report['recommendations'].append("考虑添加CSS样式以提高可读性") | |
| return accessibility_report | |
| except Exception as e: | |
| return {"error": f"可访问性检查失败: {str(e)}"} | |
| class PDFTools: | |
| """PDF处理工具类""" | |
| def download_pdf_from_url(url: str) -> str: | |
| """从URL下载PDF文件""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return "PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image" | |
| # 创建临时文件 | |
| temp_path = tempfile.mktemp(suffix='.pdf') | |
| # 下载PDF文件 | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(temp_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return temp_path | |
| except Exception as e: | |
| return f"PDF下载失败: {str(e)}" | |
| def extract_text_from_pdf(pdf_path: str) -> str: | |
| """从PDF中提取文本""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return "PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image" | |
| # 使用PyMuPDF提取文本 | |
| doc = fitz.open(pdf_path) | |
| text = "" | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text += page.get_text() | |
| doc.close() | |
| return text if text.strip() else "PDF中没有找到文本内容" | |
| except Exception as e: | |
| return f"PDF文本提取失败: {str(e)}" | |
| def extract_images_from_pdf(pdf_path: str) -> List[str]: | |
| """从PDF中提取图像""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return ["PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image"] | |
| # 使用pdf2image转换PDF页面为图像 | |
| images = convert_from_path(pdf_path, dpi=200) | |
| image_paths = [] | |
| for i, image in enumerate(images): | |
| temp_path = tempfile.mktemp(suffix=f'_page_{i+1}.jpg') | |
| image.save(temp_path, 'JPEG') | |
| image_paths.append(temp_path) | |
| return image_paths | |
| except Exception as e: | |
| return [f"PDF图像提取失败: {str(e)}"] | |
| def analyze_pdf_structure(pdf_path: str) -> Dict[str, Any]: | |
| """分析PDF结构""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return {"error": "PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image"} | |
| # 使用PyPDF2分析PDF结构 | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| # 获取基本信息 | |
| info = { | |
| "page_count": len(pdf_reader.pages), | |
| "title": pdf_reader.metadata.get('/Title', 'Unknown'), | |
| "author": pdf_reader.metadata.get('/Author', 'Unknown'), | |
| "subject": pdf_reader.metadata.get('/Subject', 'Unknown'), | |
| "creator": pdf_reader.metadata.get('/Creator', 'Unknown'), | |
| "producer": pdf_reader.metadata.get('/Producer', 'Unknown'), | |
| "creation_date": pdf_reader.metadata.get('/CreationDate', 'Unknown'), | |
| "modification_date": pdf_reader.metadata.get('/ModDate', 'Unknown') | |
| } | |
| # 分析每页内容 | |
| pages_info = [] | |
| for i, page in enumerate(pdf_reader.pages): | |
| page_text = page.extract_text() | |
| pages_info.append({ | |
| "page_number": i + 1, | |
| "text_length": len(page_text), | |
| "has_text": bool(page_text.strip()), | |
| "rotation": page.get('/Rotate', 0) | |
| }) | |
| info["pages_info"] = pages_info | |
| return info | |
| except Exception as e: | |
| return {"error": f"PDF结构分析失败: {str(e)}"} | |
| def search_text_in_pdf(pdf_path: str, search_term: str) -> List[Dict[str, Any]]: | |
| """在PDF中搜索文本""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return [{"error": "PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image"}] | |
| # 使用PyMuPDF搜索文本 | |
| doc = fitz.open(pdf_path) | |
| search_results = [] | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text_instances = page.search_for(search_term) | |
| for inst in text_instances: | |
| search_results.append({ | |
| "page_number": page_num + 1, | |
| "text": search_term, | |
| "bbox": inst, | |
| "context": page.get_text("text", clip=inst) | |
| }) | |
| doc.close() | |
| return search_results | |
| except Exception as e: | |
| return [{"error": f"PDF文本搜索失败: {str(e)}"}] | |
| def summarize_pdf_content(pdf_path: str) -> str: | |
| """总结PDF内容""" | |
| try: | |
| if not PDF_AVAILABLE: | |
| return "PDF处理功能未安装,请运行: pip install PyPDF2 PyMuPDF pdf2image" | |
| # 提取文本 | |
| doc = fitz.open(pdf_path) | |
| text = "" | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text += page.get_text() | |
| doc.close() | |
| if not text.strip(): | |
| return "PDF中没有找到文本内容" | |
| # 使用LLM总结内容 | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage | |
| llm = ChatOpenAI( | |
| model=Config.OPENAI_MODEL, | |
| temperature=0.3, | |
| api_key=Config.OPENAI_API_KEY | |
| ) | |
| # 如果文本太长,分段处理 | |
| if len(text) > 4000: | |
| text = text[:4000] + "..." | |
| prompt = f""" | |
| 请总结以下PDF文档的主要内容: | |
| {text} | |
| 请提供: | |
| 1. 文档的主要主题 | |
| 2. 关键要点 | |
| 3. 重要信息摘要 | |
| 4. 文档类型和用途 | |
| """ | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| return response.content | |
| except Exception as e: | |
| return f"PDF内容总结失败: {str(e)}" | |
| class MediaTools: | |
| """媒体处理工具类""" | |
| def extract_text_from_image(image_path: str) -> str: | |
| """从图像中提取文本""" | |
| try: | |
| # 使用OCR模型提取文本 | |
| ocr_pipeline = pipeline( | |
| "image-to-text", | |
| model="microsoft/trocr-base-handwritten", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| image = Image.open(image_path) | |
| result = ocr_pipeline(image) | |
| return result[0]['generated_text'] | |
| except Exception as e: | |
| return f"文本提取失败: {str(e)}" | |
| def analyze_image_emotion(image_path: str) -> Dict[str, Any]: | |
| """分析图像中的情感""" | |
| try: | |
| # 使用情感分析模型 | |
| emotion_pipeline = pipeline( | |
| "image-classification", | |
| model="microsoft/DialoGPT-medium", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| image = Image.open(image_path) | |
| result = emotion_pipeline(image) | |
| return { | |
| "emotions": result[:3], # 返回前3个最可能的情感 | |
| "confidence": result[0]['score'] if result else 0.0 | |
| } | |
| except Exception as e: | |
| return {"error": f"情感分析失败: {str(e)}"} | |
| def extract_video_audio(video_path: str) -> str: | |
| """从视频中提取音频信息""" | |
| try: | |
| # 简化版本:返回提示信息 | |
| return "视频音频分析功能需要安装moviepy包" | |
| except Exception as e: | |
| return f"音频提取失败: {str(e)}" | |
| def analyze_video_content(video_path: str) -> Dict[str, Any]: | |
| """分析视频内容""" | |
| try: | |
| # 使用OpenCV分析视频 | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return {"error": "无法打开视频文件"} | |
| # 获取视频基本信息 | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # 分析前几帧 | |
| frames_analyzed = [] | |
| frame_interval = max(1, frame_count // 10) # 分析10帧 | |
| for i in range(0, min(frame_count, 10)): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i * frame_interval) | |
| ret, frame = cap.read() | |
| if ret: | |
| # 转换为PIL图像进行分析 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(frame_rgb) | |
| # 使用图像描述模型 | |
| caption_pipeline = pipeline( | |
| "image-to-text", | |
| model="Salesforce/blip-image-captioning-base", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| caption_result = caption_pipeline(pil_image) | |
| frames_analyzed.append({ | |
| "frame_number": i * frame_interval, | |
| "caption": caption_result[0]['generated_text'] | |
| }) | |
| cap.release() | |
| return { | |
| "video_info": { | |
| "duration": duration, | |
| "fps": fps, | |
| "frame_count": frame_count, | |
| "resolution": f"{width}x{height}" | |
| }, | |
| "frames_analyzed": frames_analyzed, | |
| "analysis_method": "OpenCV + BLIP" | |
| } | |
| except Exception as e: | |
| return {"error": f"视频分析失败: {str(e)}"} | |
| class CodeAnalysisTools: | |
| """代码分析工具类""" | |
| def analyze_python_code(code: str) -> Dict[str, Any]: | |
| """分析Python代码""" | |
| try: | |
| # 语法检查 | |
| try: | |
| ast.parse(code) | |
| syntax_valid = True | |
| syntax_error = None | |
| except SyntaxError as e: | |
| syntax_valid = False | |
| syntax_error = str(e) | |
| # 代码复杂度分析 | |
| tree = ast.parse(code) if syntax_valid else None | |
| if tree: | |
| functions = [node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)] | |
| classes = [node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)] | |
| imports = [node for node in ast.walk(tree) if isinstance(node, (ast.Import, ast.ImportFrom))] | |
| # 计算圈复杂度(简化版) | |
| complexity = 0 | |
| for node in ast.walk(tree): | |
| if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)): | |
| complexity += 1 | |
| analysis = { | |
| "syntax_valid": syntax_valid, | |
| "syntax_error": syntax_error, | |
| "function_count": len(functions), | |
| "class_count": len(classes), | |
| "import_count": len(imports), | |
| "complexity": complexity, | |
| "functions": [f.name for f in functions], | |
| "classes": [c.name for c in classes] | |
| } | |
| else: | |
| analysis = { | |
| "syntax_valid": syntax_valid, | |
| "syntax_error": syntax_error | |
| } | |
| return analysis | |
| except Exception as e: | |
| return {"error": f"代码分析失败: {str(e)}"} | |
| def execute_python_code(code: str) -> str: | |
| """执行Python代码""" | |
| try: | |
| # 创建临时文件 | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | |
| f.write(code) | |
| temp_file = f.name | |
| # 执行代码 | |
| result = subprocess.run( | |
| [sys.executable, temp_file], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 # 30秒超时 | |
| ) | |
| # 清理临时文件 | |
| os.unlink(temp_file) | |
| if result.returncode == 0: | |
| return f"执行成功:\n{result.stdout}" | |
| else: | |
| return f"执行失败:\n{result.stderr}" | |
| except subprocess.TimeoutExpired: | |
| return "代码执行超时" | |
| except Exception as e: | |
| return f"代码执行失败: {str(e)}" | |
| def explain_code(code: str) -> str: | |
| """解释代码功能""" | |
| try: | |
| # 使用LLM解释代码 | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage | |
| llm = ChatOpenAI( | |
| model=Config.OPENAI_MODEL, | |
| temperature=0.3, | |
| api_key=Config.OPENAI_API_KEY | |
| ) | |
| prompt = f""" | |
| 请分析以下Python代码的功能和作用: | |
| ```python | |
| {code} | |
| ``` | |
| 请提供: | |
| 1. 代码的主要功能 | |
| 2. 关键部分的解释 | |
| 3. 可能的改进建议 | |
| """ | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| return response.content | |
| except Exception as e: | |
| return f"代码解释失败: {str(e)}" | |
| class SearchTools: | |
| """搜索工具类""" | |
| def __init__(self): | |
| # 使用DuckDuckGo搜索,无需API密钥 | |
| self.search_tool = DuckDuckGoSearchRun() | |
| print("✅ DuckDuckGo搜索引擎已初始化") | |
| def web_search(self, query: str) -> str: | |
| """执行网络搜索""" | |
| try: | |
| print(f"🔍 搜索查询: {query}") | |
| results = self.search_tool.run(query) | |
| return results if isinstance(results, str) else str(results) | |
| except Exception as e: | |
| print(f"❌ 搜索失败: {str(e)}") | |
| return f"搜索失败: {str(e)}" | |
| def search_images(self, query: str) -> List[str]: | |
| """搜索相关图像""" | |
| try: | |
| search_query = f"{query} images" | |
| print(f"🖼️ 图像搜索查询: {search_query}") | |
| results = self.search_tool.run(search_query) | |
| # 简单返回搜索结果,实际应用中需要解析图像URL | |
| return [results] if isinstance(results, str) else results | |
| except Exception as e: | |
| print(f"❌ 图像搜索失败: {str(e)}") | |
| return [f"图像搜索失败: {str(e)}"] | |
| def search_videos(self, query: str) -> List[str]: | |
| """搜索相关视频""" | |
| try: | |
| search_query = f"{query} videos" | |
| print(f"🎥 视频搜索查询: {search_query}") | |
| results = self.search_tool.run(search_query) | |
| return [results] if isinstance(results, str) else results | |
| except Exception as e: | |
| print(f"❌ 视频搜索失败: {str(e)}") | |
| return [f"视频搜索失败: {str(e)}"] | |
| def search_pdfs(self, query: str) -> List[str]: | |
| """搜索PDF文档""" | |
| try: | |
| search_query = f"{query} filetype:pdf" | |
| print(f"📄 PDF搜索查询: {search_query}") | |
| results = self.search_tool.run(search_query) | |
| return [results] if isinstance(results, str) else results | |
| except Exception as e: | |
| print(f"❌ PDF搜索失败: {str(e)}") | |
| return [f"PDF搜索失败: {str(e)}"] | |
| class AnalysisTools: | |
| """分析工具类""" | |
| def analyze_text_sentiment(text: str) -> Dict[str, Any]: | |
| """分析文本情感""" | |
| try: | |
| # 使用情感分析模型 | |
| sentiment_pipeline = pipeline( | |
| "sentiment-analysis", | |
| model="cardiffnlp/twitter-roberta-base-sentiment-latest", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| result = sentiment_pipeline(text) | |
| return { | |
| "sentiment": result[0]['label'], | |
| "confidence": result[0]['score'], | |
| "text": text | |
| } | |
| except Exception as e: | |
| return {"error": f"情感分析失败: {str(e)}"} | |
| def extract_keywords(text: str) -> List[str]: | |
| """提取关键词""" | |
| try: | |
| # 使用关键词提取模型 | |
| keyword_pipeline = pipeline( | |
| "token-classification", | |
| model="dbmdz/bert-large-cased-finetuned-conll03-english", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| result = keyword_pipeline(text) | |
| keywords = [] | |
| for item in result: | |
| if item['entity'] in ['B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']: | |
| keywords.append(item['word']) | |
| return list(set(keywords)) if keywords else ["无关键词"] | |
| except Exception as e: | |
| return [f"关键词提取失败: {str(e)}"] | |
| def summarize_text(text: str, max_length: int = 150) -> str: | |
| """文本摘要""" | |
| try: | |
| # 使用摘要模型 | |
| summarizer = pipeline( | |
| "summarization", | |
| model="facebook/bart-large-cnn", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| # 如果文本太长,分段处理 | |
| if len(text) > 1000: | |
| chunks = [text[i:i+1000] for i in range(0, len(text), 1000)] | |
| summaries = [] | |
| for chunk in chunks[:3]: # 只处理前3段 | |
| result = summarizer(chunk, max_length=max_length//3, min_length=30, do_sample=False) | |
| summaries.append(result[0]['summary_text']) | |
| return " ".join(summaries) | |
| else: | |
| result = summarizer(text, max_length=max_length, min_length=30, do_sample=False) | |
| return result[0]['summary_text'] | |
| except Exception as e: | |
| return f"摘要生成失败: {str(e)}" | |
| class UtilityTools: | |
| """实用工具类""" | |
| def get_current_weather(location: str) -> str: | |
| """获取当前天气""" | |
| try: | |
| # 这里可以集成天气API | |
| return f"天气查询功能需要配置天气API密钥,查询位置: {location}" | |
| except Exception as e: | |
| return f"天气查询失败: {str(e)}" | |
| def translate_text(text: str, target_language: str = "中文") -> str: | |
| """翻译文本""" | |
| try: | |
| # 使用翻译模型 | |
| translator = pipeline( | |
| "translation", | |
| model="Helsinki-NLP/opus-mt-en-zh" if target_language == "中文" else "Helsinki-NLP/opus-mt-en-fr", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| result = translator(text) | |
| return result[0]['translation_text'] | |
| except Exception as e: | |
| return f"翻译失败: {str(e)}" | |
| def calculate_math_expression(expression: str) -> str: | |
| """计算数学表达式""" | |
| try: | |
| # 安全地计算数学表达式 | |
| allowed_names = { | |
| k: v for k, v in __builtins__.items() | |
| if k in ['abs', 'round', 'min', 'max', 'sum', 'pow'] | |
| } | |
| allowed_names.update({ | |
| 'sin': lambda x: np.sin(x), | |
| 'cos': lambda x: np.cos(x), | |
| 'tan': lambda x: np.tan(x), | |
| 'sqrt': lambda x: np.sqrt(x), | |
| 'log': lambda x: np.log(x), | |
| 'pi': np.pi, | |
| 'e': np.e | |
| }) | |
| result = eval(expression, {"__builtins__": {}}, allowed_names) | |
| return str(result) | |
| except Exception as e: | |
| return f"计算失败: {str(e)}" | |
| class WikipediaTools: | |
| """Wikipedia处理工具类""" | |
| def search_wikipedia(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| """搜索Wikipedia页面""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return [{"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"}] | |
| # 创建Wikipedia API实例 | |
| wiki = wikipediaapi.Wikipedia( | |
| language='zh', | |
| user_agent='MultiModalAgent/1.0 (https://github.com/your-repo; your-email@example.com)' | |
| ) | |
| # 搜索Wikipedia页面 | |
| search_results = wiki.search(query, results=max_results) | |
| results = [] | |
| for title in search_results: | |
| try: | |
| # 获取页面 | |
| page = wiki.page(title) | |
| if page.exists(): | |
| results.append({ | |
| 'title': page.title, | |
| 'url': page.fullurl, | |
| 'summary': page.summary[:300] + "..." if len(page.summary) > 300 else page.summary, | |
| 'page_id': page.pageid | |
| }) | |
| else: | |
| results.append({ | |
| 'title': title, | |
| 'url': f"https://zh.wikipedia.org/wiki/{title.replace(' ', '_')}", | |
| 'summary': "页面不存在", | |
| 'page_id': None | |
| }) | |
| except Exception as e: | |
| # 如果获取页面失败,只返回标题 | |
| results.append({ | |
| 'title': title, | |
| 'url': f"https://zh.wikipedia.org/wiki/{title.replace(' ', '_')}", | |
| 'summary': f"无法获取摘要: {str(e)}", | |
| 'page_id': None | |
| }) | |
| return results | |
| except Exception as e: | |
| return [{"error": f"Wikipedia搜索失败: {str(e)}"}] | |
| def get_wikipedia_page(title: str) -> Dict[str, Any]: | |
| """获取Wikipedia页面内容""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return {"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"} | |
| # 创建Wikipedia API实例 | |
| wiki = wikipediaapi.Wikipedia( | |
| language='zh', | |
| user_agent='MultiModalAgent/1.0 (https://github.com/your-repo; your-email@example.com)' | |
| ) | |
| # 获取页面 | |
| page = wiki.page(title) | |
| if not page.exists(): | |
| return {"error": f"Wikipedia页面 '{title}' 不存在"} | |
| # 获取页面信息 | |
| page_info = { | |
| 'title': page.title, | |
| 'url': page.fullurl, | |
| 'summary': page.summary, | |
| 'content': page.text[:5000] + "..." if len(page.text) > 5000 else page.text, # 限制内容长度 | |
| 'page_id': page.pageid, | |
| 'categories': list(page.categories.keys())[:10], # 限制分类数量 | |
| 'links': list(page.links.keys())[:20], # 限制链接数量 | |
| 'content_length': len(page.text) | |
| } | |
| return page_info | |
| except Exception as e: | |
| return {"error": f"Wikipedia页面获取失败: {str(e)}"} | |
| def get_wikipedia_summary(title: str) -> str: | |
| """获取Wikipedia页面摘要""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4" | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 获取页面摘要 | |
| summary = wikipedia.summary(title, sentences=5, auto_suggest=False) | |
| return summary | |
| except Exception as e: | |
| return f"Wikipedia摘要获取失败: {str(e)}" | |
| def get_wikipedia_random_page() -> Dict[str, Any]: | |
| """获取随机Wikipedia页面""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return {"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"} | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 获取随机页面 | |
| random_title = wikipedia.random(1) | |
| if random_title: | |
| return WikipediaTools.get_wikipedia_page(random_title[0]) | |
| else: | |
| return {"error": "无法获取随机页面"} | |
| except Exception as e: | |
| return {"error": f"随机Wikipedia页面获取失败: {str(e)}"} | |
| def search_wikipedia_english(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| """搜索英文Wikipedia页面""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return [{"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"}] | |
| # 设置语言为英文 | |
| wikipedia.set_lang("en") | |
| # 搜索Wikipedia页面 | |
| search_results = wikipedia.search(query, results=max_results) | |
| results = [] | |
| for title in search_results: | |
| try: | |
| # 获取页面摘要 | |
| page = wikipedia.page(title, auto_suggest=False) | |
| results.append({ | |
| 'title': title, | |
| 'url': page.url, | |
| 'summary': page.summary[:300] + "..." if len(page.summary) > 300 else page.summary, | |
| 'page_id': page.pageid | |
| }) | |
| except Exception as e: | |
| # 如果获取页面失败,只返回标题 | |
| results.append({ | |
| 'title': title, | |
| 'url': f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}", | |
| 'summary': f"无法获取摘要: {str(e)}", | |
| 'page_id': None | |
| }) | |
| return results | |
| except Exception as e: | |
| return [{"error": f"英文Wikipedia搜索失败: {str(e)}"}] | |
| def get_wikipedia_page_english(title: str) -> Dict[str, Any]: | |
| """获取英文Wikipedia页面内容""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return {"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"} | |
| # 设置语言为英文 | |
| wikipedia.set_lang("en") | |
| # 获取页面 | |
| page = wikipedia.page(title, auto_suggest=False) | |
| # 获取页面内容 | |
| content = page.content | |
| # 获取页面信息 | |
| page_info = { | |
| 'title': page.title, | |
| 'url': page.url, | |
| 'summary': page.summary, | |
| 'content': content[:5000] + "..." if len(content) > 5000 else content, # 限制内容长度 | |
| 'page_id': page.pageid, | |
| 'categories': page.categories[:10], # 限制分类数量 | |
| 'links': page.links[:20], # 限制链接数量 | |
| 'references': page.references[:10] if hasattr(page, 'references') else [], # 限制引用数量 | |
| 'images': page.images[:10] if hasattr(page, 'images') else [], # 限制图片数量 | |
| 'content_length': len(content) | |
| } | |
| return page_info | |
| except Exception as e: | |
| return {"error": f"英文Wikipedia页面获取失败: {str(e)}"} | |
| def get_wikipedia_suggestions(query: str) -> List[str]: | |
| """获取Wikipedia搜索建议""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return ["Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"] | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 获取搜索建议 | |
| suggestions = wikipedia.search(query, results=10) | |
| return suggestions | |
| except Exception as e: | |
| return [f"Wikipedia搜索建议获取失败: {str(e)}"] | |
| def get_wikipedia_categories(title: str) -> List[str]: | |
| """获取Wikipedia页面分类""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return ["Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"] | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 获取页面 | |
| page = wikipedia.page(title, auto_suggest=False) | |
| # 获取分类 | |
| categories = page.categories | |
| return categories[:20] # 限制分类数量 | |
| except Exception as e: | |
| return [f"Wikipedia分类获取失败: {str(e)}"] | |
| def get_wikipedia_links(title: str) -> List[str]: | |
| """获取Wikipedia页面链接""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return ["Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"] | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 获取页面 | |
| page = wikipedia.page(title, auto_suggest=False) | |
| # 获取链接 | |
| links = page.links | |
| return links[:30] # 限制链接数量 | |
| except Exception as e: | |
| return [f"Wikipedia链接获取失败: {str(e)}"] | |
| def get_wikipedia_geosearch(latitude: float, longitude: float, radius: int = 1000) -> List[Dict[str, Any]]: | |
| """根据地理坐标搜索附近的Wikipedia页面""" | |
| try: | |
| if not WIKIPEDIA_AVAILABLE: | |
| return [{"error": "Wikipedia处理功能未安装,请运行: pip install wikipedia-api requests beautifulsoup4"}] | |
| # 设置语言为中文 | |
| wikipedia.set_lang("zh") | |
| # 地理搜索 | |
| nearby_pages = wikipedia.geosearch(latitude, longitude, radius=radius) | |
| results = [] | |
| for page in nearby_pages: | |
| try: | |
| results.append({ | |
| 'title': page.title, | |
| 'url': page.url, | |
| 'summary': page.summary[:200] + "..." if len(page.summary) > 200 else page.summary, | |
| 'distance': page.distance if hasattr(page, 'distance') else None, | |
| 'coordinates': page.coordinates if hasattr(page, 'coordinates') else None | |
| }) | |
| except Exception as e: | |
| results.append({ | |
| 'title': page.title, | |
| 'url': page.url, | |
| 'summary': f"无法获取摘要: {str(e)}", | |
| 'distance': None, | |
| 'coordinates': None | |
| }) | |
| return results | |
| except Exception as e: | |
| return [{"error": f"Wikipedia地理搜索失败: {str(e)}"}] | |
| class YouTubeTools: | |
| """YouTube视频处理工具类""" | |
| def download_youtube_video(url: str) -> str: | |
| """下载YouTube视频""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| if not YT_DLP_AVAILABLE: | |
| return "YouTube视频下载需要安装yt-dlp,请运行: pip install yt-dlp" | |
| # 使用yt-dlp下载视频(更稳定) | |
| ydl_opts = { | |
| 'format': 'best[height<=720]', # 限制分辨率 | |
| 'outtmpl': '%(title)s.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| video_path = ydl.prepare_filename(info) | |
| return video_path | |
| except Exception as e: | |
| return f"YouTube视频下载失败: {str(e)}" | |
| def get_youtube_info(url: str) -> Dict[str, Any]: | |
| """获取YouTube视频信息""" | |
| try: | |
| # 提取视频ID | |
| import re | |
| video_id_match = re.search(r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([^&\n?#]+)', url) | |
| if not video_id_match: | |
| return {"error": "无效的YouTube URL"} | |
| video_id = video_id_match.group(1) | |
| # 首先尝试使用yt-dlp(更稳定) | |
| if YT_DLP_AVAILABLE: | |
| try: | |
| import yt_dlp | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| video_info = { | |
| 'title': info.get('title', f'YouTube视频 {video_id}'), | |
| 'author': info.get('uploader', 'Unknown'), | |
| 'length': info.get('duration', 0), | |
| 'views': info.get('view_count', 0), | |
| 'description': info.get('description', '')[:500] + "..." if len(info.get('description', '')) > 500 else info.get('description', ''), | |
| 'publish_date': str(info.get('upload_date', 'Unknown')), | |
| 'rating': info.get('average_rating', 0), | |
| 'keywords': info.get('tags', []), | |
| 'thumbnail_url': info.get('thumbnail', f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg"), | |
| 'video_id': video_id, | |
| 'url': url, | |
| 'method': 'yt-dlp' | |
| } | |
| return video_info | |
| except Exception as e: | |
| print(f"yt-dlp获取失败: {e}") | |
| # 如果yt-dlp失败,尝试使用pytube | |
| if YOUTUBE_AVAILABLE: | |
| try: | |
| from pytube import YouTube | |
| yt = YouTube(url) | |
| # 获取视频信息 | |
| video_info = { | |
| 'title': yt.title, | |
| 'author': yt.author, | |
| 'length': yt.length, # 秒 | |
| 'views': yt.views, | |
| 'description': yt.description[:500] + "..." if len(yt.description) > 500 else yt.description, | |
| 'publish_date': str(yt.publish_date) if yt.publish_date else "Unknown", | |
| 'rating': yt.rating, | |
| 'keywords': yt.keywords, | |
| 'thumbnail_url': yt.thumbnail_url, | |
| 'video_id': video_id, | |
| 'url': url, | |
| 'method': 'pytube' | |
| } | |
| return video_info | |
| except Exception as e: | |
| print(f"pytube获取失败: {e}") | |
| # 如果都失败了,返回基本信息 | |
| return { | |
| 'title': f"YouTube视频 {video_id}", | |
| 'author': "Unknown", | |
| 'length': 0, | |
| 'views': 0, | |
| 'description': "无法获取详细信息,可能需要更新YouTube处理库", | |
| 'publish_date': "Unknown", | |
| 'rating': 0, | |
| 'keywords': [], | |
| 'thumbnail_url': f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg", | |
| 'video_id': video_id, | |
| 'url': url, | |
| 'note': "所有YouTube处理库都失败,建议更新pytube或安装yt-dlp" | |
| } | |
| except Exception as e: | |
| return {"error": f"YouTube信息获取失败: {str(e)}"} | |
| def extract_youtube_audio(url: str) -> str: | |
| """提取YouTube视频音频""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| if not YT_DLP_AVAILABLE: | |
| return "YouTube音频提取需要安装yt-dlp,请运行: pip install yt-dlp" | |
| # 使用yt-dlp提取音频 | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': '%(title)s.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| audio_path = ydl.prepare_filename(info).replace('.webm', '.mp3').replace('.m4a', '.mp3') | |
| return audio_path | |
| except Exception as e: | |
| return f"YouTube音频提取失败: {str(e)}" | |
| def download_youtube_thumbnail(url: str) -> str: | |
| """下载YouTube视频缩略图""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| # 提取视频ID | |
| import re | |
| video_id_match = re.search(r'(?:youtube\.com\/watch\?v=|youtu\.be\/)([^&\n?#]+)', url) | |
| if not video_id_match: | |
| return "无效的YouTube URL" | |
| video_id = video_id_match.group(1) | |
| # 尝试使用pytube获取缩略图URL | |
| try: | |
| yt = YouTube(url) | |
| thumbnail_url = yt.thumbnail_url | |
| except Exception as e: | |
| # 如果pytube失败,使用标准缩略图URL | |
| thumbnail_url = f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg" | |
| # 下载缩略图 | |
| import tempfile | |
| import urllib.request | |
| temp_path = tempfile.mktemp(suffix='.jpg') | |
| urllib.request.urlretrieve(thumbnail_url, temp_path) | |
| return temp_path | |
| except Exception as e: | |
| return f"YouTube缩略图下载失败: {str(e)}" | |
| def search_youtube_videos(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
| """搜索YouTube视频""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return [{"error": "YouTube处理功能未安装,请运行: pip install pytube"}] | |
| # 使用DuckDuckGo搜索YouTube视频 | |
| from duckduckgo_search import DDGS | |
| try: | |
| with DDGS() as ddgs: | |
| search_results = list(ddgs.text(f"{query} site:youtube.com", max_results=max_results)) | |
| videos = [] | |
| for result in search_results: | |
| if result and 'youtube.com/watch' in result.get('link', ''): | |
| videos.append({ | |
| 'title': result.get('title', 'Unknown'), | |
| 'url': result.get('link', ''), | |
| 'duration': 0, | |
| 'view_count': 0, | |
| 'uploader': 'Unknown', | |
| 'thumbnail': '', | |
| 'description': result.get('body', '')[:200] + "..." if len(result.get('body', '')) > 200 else result.get('body', '') | |
| }) | |
| return videos | |
| except Exception as search_error: | |
| return [{"error": f"DuckDuckGo搜索失败: {str(search_error)}"}] | |
| except Exception as e: | |
| return [{"error": f"YouTube搜索失败: {str(e)}"}] | |
| def analyze_youtube_comments(url: str, max_comments: int = 10) -> List[Dict[str, Any]]: | |
| """分析YouTube视频评论""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return [{"error": "YouTube处理功能未安装,请运行: pip install pytube yt-dlp"}] | |
| # 使用yt-dlp获取评论 | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'writecomments': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| comments = [] | |
| if 'comments' in info: | |
| for comment in info['comments'][:max_comments]: | |
| comments.append({ | |
| 'author': comment.get('author', 'Unknown'), | |
| 'text': comment.get('text', ''), | |
| 'like_count': comment.get('like_count', 0), | |
| 'time': comment.get('time', ''), | |
| 'reply_count': comment.get('reply_count', 0) | |
| }) | |
| return comments | |
| except Exception as e: | |
| return [{"error": f"YouTube评论分析失败: {str(e)}"}] | |
| def get_youtube_playlist_info(playlist_url: str) -> Dict[str, Any]: | |
| """获取YouTube播放列表信息""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return {"error": "YouTube处理功能未安装,请运行: pip install pytube"} | |
| if not YT_DLP_AVAILABLE: | |
| return {"error": "YouTube播放列表功能需要安装yt-dlp,请运行: pip install yt-dlp"} | |
| # 使用yt-dlp获取播放列表信息 | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True, | |
| 'playlist_items': '1-10' # 只获取前10个视频 | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(playlist_url, download=False) | |
| playlist_info = { | |
| 'title': info.get('title', 'Unknown'), | |
| 'description': info.get('description', '')[:500] + "..." if len(info.get('description', '')) > 500 else info.get('description', ''), | |
| 'video_count': info.get('playlist_count', 0), | |
| 'uploader': info.get('uploader', 'Unknown'), | |
| 'videos': [] | |
| } | |
| if 'entries' in info: | |
| for entry in info['entries']: | |
| if entry: | |
| playlist_info['videos'].append({ | |
| 'title': entry.get('title', 'Unknown'), | |
| 'url': entry.get('url', ''), | |
| 'duration': entry.get('duration', 0), | |
| 'uploader': entry.get('uploader', 'Unknown') | |
| }) | |
| return playlist_info | |
| except Exception as e: | |
| return {"error": f"YouTube播放列表信息获取失败: {str(e)}"} | |
| def download_youtube_video_for_watching(url: str, quality: str = "720p") -> str: | |
| """下载YouTube视频用于观看""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| if not YT_DLP_AVAILABLE: | |
| return "YouTube视频下载需要安装yt-dlp,请运行: pip install yt-dlp" | |
| # 设置下载选项 | |
| ydl_opts = { | |
| 'format': f'best[height<={quality.replace("p", "")}]', | |
| 'outtmpl': 'downloads/%(title)s.%(ext)s', | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'progress_hooks': [lambda d: print(f"下载进度: {d.get('_percent_str', '0%')}") if d['status'] == 'downloading' else None] | |
| } | |
| # 创建下载目录 | |
| import os | |
| os.makedirs('downloads', exist_ok=True) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| video_path = ydl.prepare_filename(info) | |
| return f"视频已下载到: {video_path}" | |
| except Exception as e: | |
| return f"YouTube视频下载失败: {str(e)}" | |
| def extract_youtube_audio_for_listening(url: str, format: str = "mp3") -> str: | |
| """提取YouTube视频音频用于听取""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| if not YT_DLP_AVAILABLE: | |
| return "YouTube音频提取需要安装yt-dlp,请运行: pip install yt-dlp" | |
| # 设置下载选项(不使用ffmpeg后处理) | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': 'downloads/%(title)s.%(ext)s', | |
| 'quiet': False, | |
| 'no_warnings': False | |
| } | |
| # 创建下载目录 | |
| import os | |
| os.makedirs('downloads', exist_ok=True) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| audio_path = ydl.prepare_filename(info) | |
| return f"音频已提取到: {audio_path} (原始格式,可用播放器播放)" | |
| except Exception as e: | |
| return f"YouTube音频提取失败: {str(e)}" | |
| def transcribe_youtube_video(url: str) -> str: | |
| """将YouTube视频转换为文字""" | |
| try: | |
| if not YOUTUBE_AVAILABLE: | |
| return "YouTube处理功能未安装,请运行: pip install pytube" | |
| if not YT_DLP_AVAILABLE: | |
| return "YouTube视频转录需要安装yt-dlp,请运行: pip install yt-dlp" | |
| if not AUDIO_PROCESSING_AVAILABLE: | |
| return "音频转录功能需要安装SpeechRecognition和pydub,请运行: pip install SpeechRecognition pydub" | |
| # 首先下载音频 | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': 'downloads/%(title)s.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| import os | |
| os.makedirs('downloads', exist_ok=True) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| audio_path = ydl.prepare_filename(info) | |
| # 转换为WAV格式用于语音识别 | |
| audio = AudioSegment.from_file(audio_path) | |
| wav_path = audio_path.replace('.webm', '.wav').replace('.m4a', '.wav') | |
| audio.export(wav_path, format="wav") | |
| # 语音识别 | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data, language='zh-CN') | |
| # 清理临时文件 | |
| os.remove(wav_path) | |
| return f"视频转录结果:\n{text}" | |
| except Exception as e: | |
| return f"YouTube视频转录失败: {str(e)}" | |
| def analyze_youtube_video_content(url: str) -> Dict[str, Any]: | |
| """分析YouTube视频内容 - 真正让VLLM看视频和听视频""" | |
| try: | |
| # 获取视频信息 | |
| video_info = YouTubeTools.get_youtube_info(url) | |
| if 'error' in video_info: | |
| return video_info | |
| analysis_result = { | |
| 'video_info': video_info, | |
| 'visual_analysis': "视频视觉分析功能不可用", | |
| 'audio_analysis': "音频分析功能不可用", | |
| 'transcription': "音频转录功能不可用" | |
| } | |
| # 1. 下载视频用于视觉分析 | |
| if YT_DLP_AVAILABLE: | |
| try: | |
| # 下载视频文件 | |
| ydl_opts = { | |
| 'format': 'best[height<=720]', # 限制分辨率 | |
| 'outtmpl': 'downloads/%(title)s.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| import os | |
| os.makedirs('downloads', exist_ok=True) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| video_path = ydl.prepare_filename(info) | |
| # 2. 提取关键帧进行视觉分析 | |
| try: | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| cap = cv2.VideoCapture(video_path) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| # 提取关键帧(每秒1帧) | |
| key_frames = [] | |
| frame_interval = max(1, int(fps)) | |
| for i in range(0, frame_count, frame_interval): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| ret, frame = cap.read() | |
| if ret: | |
| # 转换为PIL图像 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(frame_rgb) | |
| # 保存关键帧 | |
| frame_path = f"downloads/frame_{i//frame_interval:03d}.jpg" | |
| pil_image.save(frame_path, "JPEG", quality=85) | |
| key_frames.append({ | |
| 'frame_number': i, | |
| 'timestamp': i / fps if fps > 0 else 0, | |
| 'path': frame_path | |
| }) | |
| cap.release() | |
| # 3. 使用VLLM分析关键帧 | |
| try: | |
| from transformers import pipeline | |
| # 图像描述模型 | |
| image_to_text = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning") | |
| visual_descriptions = [] | |
| for frame_info in key_frames[:10]: # 限制分析前10帧 | |
| try: | |
| description = image_to_text(frame_info['path'])[0]['generated_text'] | |
| visual_descriptions.append({ | |
| 'timestamp': frame_info['timestamp'], | |
| 'description': description | |
| }) | |
| except Exception as e: | |
| print(f"帧分析失败: {e}") | |
| analysis_result['visual_analysis'] = { | |
| 'video_path': video_path, | |
| 'duration': duration, | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'key_frames_analyzed': len(visual_descriptions), | |
| 'visual_descriptions': visual_descriptions, | |
| 'summary': f"视频包含{len(visual_descriptions)}个关键场景" | |
| } | |
| except Exception as e: | |
| analysis_result['visual_analysis'] = f"VLLM视觉分析失败: {str(e)}" | |
| except Exception as e: | |
| analysis_result['visual_analysis'] = f"视频帧提取失败: {str(e)}" | |
| except Exception as e: | |
| analysis_result['visual_analysis'] = f"视频下载失败: {str(e)}" | |
| # 4. 音频分析和转录(不依赖ffmpeg) | |
| if YT_DLP_AVAILABLE: | |
| try: | |
| # 下载音频 | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': 'downloads/%(title)s_audio.%(ext)s', | |
| 'quiet': True, | |
| 'no_warnings': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| audio_path = ydl.prepare_filename(info) | |
| # 音频转录(使用多种方法,不依赖ffmpeg) | |
| try: | |
| # 方法1: 尝试使用whisper(推荐,不需要ffmpeg) | |
| try: | |
| import whisper | |
| print("🎤 使用whisper进行音频转录...") | |
| model = whisper.load_model("base") | |
| result = model.transcribe(audio_path) | |
| transcription_text = result["text"] | |
| analysis_result['transcription'] = transcription_text | |
| analysis_result['audio_analysis'] = { | |
| 'audio_path': audio_path, | |
| 'duration': result.get('duration', 0), | |
| 'transcription': transcription_text, | |
| 'method': 'whisper', | |
| 'summary': f"音频时长{result.get('duration', 0):.1f}秒,已转录为文字" | |
| } | |
| print("✅ whisper转录成功") | |
| except ImportError: | |
| print("⚠️ whisper未安装,尝试其他方法...") | |
| # 方法2: 尝试使用pydub + speech_recognition(如果ffmpeg可用) | |
| try: | |
| from pydub import AudioSegment | |
| import speech_recognition as sr | |
| # 检查ffmpeg是否可用 | |
| import subprocess | |
| try: | |
| subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
| ffmpeg_available = True | |
| print("✅ ffmpeg可用,使用pydub+speech_recognition") | |
| except: | |
| ffmpeg_available = False | |
| print("❌ ffmpeg不可用") | |
| if ffmpeg_available: | |
| # 转换为WAV格式 | |
| audio = AudioSegment.from_file(audio_path) | |
| wav_path = audio_path.replace('.webm', '.wav').replace('.m4a', '.wav') | |
| audio.export(wav_path, format="wav") | |
| # 语音识别 | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = recognizer.record(source) | |
| transcription_text = recognizer.recognize_google(audio_data, language='zh-CN') | |
| analysis_result['transcription'] = transcription_text | |
| analysis_result['audio_analysis'] = { | |
| 'audio_path': audio_path, | |
| 'duration': len(audio) / 1000, # 秒 | |
| 'transcription': transcription_text, | |
| 'method': 'pydub+speech_recognition', | |
| 'summary': f"音频时长{len(audio)/1000:.1f}秒,已转录为文字" | |
| } | |
| # 清理临时文件 | |
| import os | |
| if os.path.exists(wav_path): | |
| os.remove(wav_path) | |
| else: | |
| # 方法3: 只提供音频文件信息,不进行转录 | |
| analysis_result['transcription'] = "音频转录需要安装whisper或ffmpeg" | |
| analysis_result['audio_analysis'] = { | |
| 'audio_path': audio_path, | |
| 'duration': 'unknown', | |
| 'transcription': '需要ffmpeg或whisper进行转录', | |
| 'method': 'audio_only', | |
| 'summary': f"音频已下载到: {audio_path},需要安装whisper或ffmpeg进行转录" | |
| } | |
| except Exception as e: | |
| print(f"❌ pydub+speech_recognition失败: {e}") | |
| analysis_result['transcription'] = f"音频转录失败: {str(e)}" | |
| analysis_result['audio_analysis'] = { | |
| 'audio_path': audio_path, | |
| 'duration': 'unknown', | |
| 'transcription': f'转录失败: {str(e)}', | |
| 'method': 'failed', | |
| 'summary': f"音频已下载,但转录失败: {str(e)}" | |
| } | |
| except Exception as e: | |
| analysis_result['transcription'] = f"音频转录失败: {str(e)}" | |
| analysis_result['audio_analysis'] = { | |
| 'audio_path': audio_path, | |
| 'duration': 'unknown', | |
| 'transcription': f'转录失败: {str(e)}', | |
| 'method': 'failed', | |
| 'summary': f"音频已下载,但转录失败: {str(e)}" | |
| } | |
| except Exception as e: | |
| analysis_result['audio_analysis'] = f"音频下载失败: {str(e)}" | |
| # 5. 综合分析结果 | |
| analysis_result['summary'] = f"这是一个关于{video_info.get('title', '未知主题')}的视频,时长{video_info.get('length', 0)}秒" | |
| analysis_result['key_points'] = [ | |
| "视频标题: " + video_info.get('title', 'Unknown'), | |
| "作者: " + video_info.get('author', 'Unknown'), | |
| "时长: " + str(video_info.get('length', 0)) + "秒", | |
| "观看次数: " + str(video_info.get('views', 0)), | |
| "视觉分析: " + ("已完成" if isinstance(analysis_result['visual_analysis'], dict) else "失败"), | |
| "音频分析: " + ("已完成" if isinstance(analysis_result['audio_analysis'], dict) else "失败") | |
| ] | |
| return analysis_result | |
| except Exception as e: | |
| return {"error": f"YouTube视频内容分析失败: {str(e)}"} | |
| class ToolManager: | |
| """工具管理器""" | |
| def __init__(self): | |
| self.media_tools = MediaTools() | |
| self.code_tools = CodeAnalysisTools() | |
| self.pdf_tools = PDFTools() | |
| self.search_tools = SearchTools() | |
| self.analysis_tools = AnalysisTools() | |
| self.utility_tools = UtilityTools() | |
| self.web_tools = WebTools() # 添加WebTools到管理器 | |
| self.youtube_tools = YouTubeTools() # 添加YouTubeTools到管理器 | |
| self.wikipedia_tools = WikipediaTools() # 添加WikipediaTools到管理器 | |
| # 注册所有工具 | |
| self.tools = { | |
| # PDF工具 | |
| 'download_pdf_from_url': self.pdf_tools.download_pdf_from_url, | |
| 'extract_text_from_pdf': self.pdf_tools.extract_text_from_pdf, | |
| 'extract_images_from_pdf': self.pdf_tools.extract_images_from_pdf, | |
| 'analyze_pdf_structure': self.pdf_tools.analyze_pdf_structure, | |
| 'search_text_in_pdf': self.pdf_tools.search_text_in_pdf, | |
| 'summarize_pdf_content': self.pdf_tools.summarize_pdf_content, | |
| # 媒体工具 | |
| 'extract_text_from_image': self.media_tools.extract_text_from_image, | |
| 'analyze_image_emotion': self.media_tools.analyze_image_emotion, | |
| 'extract_video_audio': self.media_tools.extract_video_audio, | |
| 'analyze_video_content': self.media_tools.analyze_video_content, | |
| # 代码工具 | |
| 'analyze_python_code': self.code_tools.analyze_python_code, | |
| 'execute_python_code': self.code_tools.execute_python_code, | |
| 'explain_code': self.code_tools.explain_code, | |
| # 搜索工具 | |
| 'web_search': self.search_tools.web_search, | |
| 'search_images': self.search_tools.search_images, | |
| 'search_videos': self.search_tools.search_videos, | |
| 'search_pdfs': self.search_tools.search_pdfs, | |
| # 分析工具 | |
| 'analyze_text_sentiment': self.analysis_tools.analyze_text_sentiment, | |
| 'extract_keywords': self.analysis_tools.extract_keywords, | |
| 'summarize_text': self.analysis_tools.summarize_text, | |
| # 实用工具 | |
| 'get_current_weather': self.utility_tools.get_current_weather, | |
| 'translate_text': self.utility_tools.translate_text, | |
| 'calculate_math_expression': self.utility_tools.calculate_math_expression, | |
| # 网页工具 | |
| 'fetch_webpage_content': self.web_tools.fetch_webpage_content, | |
| 'extract_text_from_webpage': self.web_tools.extract_text_from_webpage, | |
| 'analyze_webpage_structure': self.web_tools.analyze_webpage_structure, | |
| 'search_content_in_webpage': self.web_tools.search_content_in_webpage, | |
| 'extract_links_from_webpage': self.web_tools.extract_links_from_webpage, | |
| 'summarize_webpage_content': self.web_tools.summarize_webpage_content, | |
| 'check_webpage_accessibility': self.web_tools.check_webpage_accessibility, | |
| # YouTube工具 | |
| 'download_youtube_video': self.youtube_tools.download_youtube_video, | |
| 'get_youtube_info': self.youtube_tools.get_youtube_info, | |
| 'extract_youtube_audio': self.youtube_tools.extract_youtube_audio, | |
| 'download_youtube_thumbnail': self.youtube_tools.download_youtube_thumbnail, | |
| 'search_youtube_videos': self.youtube_tools.search_youtube_videos, | |
| 'analyze_youtube_comments': self.youtube_tools.analyze_youtube_comments, | |
| 'get_youtube_playlist_info': self.youtube_tools.get_youtube_playlist_info, | |
| 'download_youtube_video_for_watching': self.youtube_tools.download_youtube_video_for_watching, | |
| 'extract_youtube_audio_for_listening': self.youtube_tools.extract_youtube_audio_for_listening, | |
| 'transcribe_youtube_video': self.youtube_tools.transcribe_youtube_video, | |
| 'analyze_youtube_video_content': self.youtube_tools.analyze_youtube_video_content, | |
| # Wikipedia工具 | |
| 'search_wikipedia': self.wikipedia_tools.search_wikipedia, | |
| 'get_wikipedia_page': self.wikipedia_tools.get_wikipedia_page, | |
| 'get_wikipedia_summary': self.wikipedia_tools.get_wikipedia_summary, | |
| 'get_wikipedia_random_page': self.wikipedia_tools.get_wikipedia_random_page, | |
| 'search_wikipedia_english': self.wikipedia_tools.search_wikipedia_english, | |
| 'get_wikipedia_page_english': self.wikipedia_tools.get_wikipedia_page_english, | |
| 'get_wikipedia_suggestions': self.wikipedia_tools.get_wikipedia_suggestions, | |
| 'get_wikipedia_categories': self.wikipedia_tools.get_wikipedia_categories, | |
| 'get_wikipedia_links': self.wikipedia_tools.get_wikipedia_links, | |
| 'get_wikipedia_geosearch': self.wikipedia_tools.get_wikipedia_geosearch, | |
| } | |
| def get_tool(self, tool_name: str): | |
| """获取工具""" | |
| return self.tools.get(tool_name) | |
| def list_tools(self) -> List[str]: | |
| """列出所有可用工具""" | |
| return list(self.tools.keys()) | |
| def execute_tool(self, tool_name: str, **kwargs) -> Any: | |
| """执行工具""" | |
| tool = self.get_tool(tool_name) | |
| if tool: | |
| # 直接调用工具函数 | |
| if hasattr(tool, 'func'): | |
| # 如果是@tool装饰的函数,直接调用原始函数 | |
| return tool.func(**kwargs) | |
| elif hasattr(tool, '__wrapped__'): | |
| # 备用方法 | |
| return tool.__wrapped__(**kwargs) | |
| else: | |
| # 最后尝试run方法 | |
| return tool.run(**kwargs) | |
| else: | |
| raise ValueError(f"工具 '{tool_name}' 不存在") | |
| def should_use_search(self, question: str, context: Dict[str, Any]) -> bool: | |
| """判断是否需要使用搜索引擎""" | |
| question_lower = question.lower() | |
| # 不需要搜索的情况 | |
| no_search_keywords = [ | |
| '计算', 'calculate', 'math', '数学', | |
| '代码', 'code', 'python', 'program', | |
| '翻译', 'translate', | |
| '天气', 'weather', | |
| '情感', 'sentiment', 'emotion', | |
| '关键词', 'keywords', | |
| '摘要', 'summary', 'summarize', | |
| 'pdf', '文档', 'document' | |
| ] | |
| # 需要搜索的情况 | |
| search_keywords = [ | |
| '最新', 'latest', 'news', '新闻', | |
| '什么是', 'what is', 'how to', '如何', | |
| '价格', 'price', 'cost', | |
| '地点', 'location', 'where', | |
| '时间', 'time', 'when', | |
| '比较', 'compare', 'vs', | |
| '推荐', 'recommend', 'best' | |
| ] | |
| # 检查问题类型 | |
| for keyword in no_search_keywords: | |
| if keyword in question_lower: | |
| return False | |
| for keyword in search_keywords: | |
| if keyword in question_lower: | |
| return True | |
| # 如果问题包含具体实体或需要实时信息,使用搜索 | |
| if any(word in question_lower for word in ['2024', '2023', 'today', 'now', 'current']): | |
| return True | |
| # 默认不使用搜索,除非问题很长或很复杂 | |
| return len(question) > 50 | |