Spaces:
Runtime error
Runtime error
| import os | |
| def run_web_search(query, num_results=5, domain_filter=""): | |
| """ | |
| Run a web search using Tavily API. | |
| Args: | |
| query (str): Search query. | |
| num_results (int): Number of results to retrieve. | |
| domain_filter (str): Optional domain filter (comma-separated domains). | |
| Returns: | |
| list[dict] | dict: Tavily response. It may return a list directly or a dict with a "results" key. | |
| Raises: | |
| ValueError: If the TAVILY_API_KEY env var is not set. | |
| """ | |
| try: | |
| from tavily import TavilyClient | |
| except ImportError: | |
| raise ImportError("Please install tavily-python") | |
| api_key = os.getenv("TAVILY_API_KEY") | |
| if not api_key: | |
| raise ValueError("TAVILY_API_KEY environment variable is required") | |
| client = TavilyClient(api_key=api_key) | |
| params = {"num": num_results} | |
| if domain_filter: | |
| # Tavily does not support domain filter directly; pass as search_kwargs if needed | |
| params["search_kwargs"] = {"site": domain_filter} | |
| results = client.search(query, **params) | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Extended helper functions for credible research and extraction. | |
| # --------------------------------------------------------------------------- | |
| import re | |
| from typing import List, Dict, Optional | |
| # Additional imports for PDF extraction | |
| import io | |
| try: | |
| from PyPDF2 import PdfReader # type: ignore | |
| except ImportError: | |
| # PyPDF2 will be installed via requirements; if missing, pdf extraction will be disabled | |
| PdfReader = None | |
| # Import DB helpers from sibling module. Note: db.py resides in the same package directory. | |
| from db import get_resource, upsert_resource | |
| def web_search(query: str, max_results: int = 5, allowed_domains: Optional[List[str]] = None) -> List[Dict]: | |
| """ | |
| Perform a web search and return a list of result dictionaries, filtering by allowed domains. | |
| Args: | |
| query: Search string. | |
| max_results: Maximum number of results to return. | |
| allowed_domains: Optional list of domains to permit. If provided, only results with URLs | |
| containing one of these domains will be included. | |
| Returns: | |
| A list of search results (dicts with at least 'url' and 'title' keys). | |
| """ | |
| raw_results = run_web_search(query, num_results=max_results) | |
| # Tavily can return either a list or a dict with 'results' | |
| results_list = raw_results.get("results", []) if isinstance(raw_results, dict) else raw_results or [] | |
| # Filter out results that do not meet allowed domains, if specified | |
| filtered: List[Dict] = [] | |
| for item in results_list: | |
| if not isinstance(item, dict): | |
| continue | |
| url = item.get("url", "") | |
| # Basic domain filtering: allow if allowed_domains is None or URL's domain ends with allowed domain | |
| if allowed_domains: | |
| try: | |
| from urllib.parse import urlparse | |
| domain = urlparse(url).netloc.lower() | |
| if not any(domain.endswith(ad.lower()) for ad in allowed_domains): | |
| continue | |
| except Exception: | |
| continue | |
| filtered.append(item) | |
| if len(filtered) >= max_results: | |
| break | |
| return filtered | |
| def fetch_and_extract(url: str, timeout: int = 15) -> Optional[Dict]: | |
| """ | |
| Fetch a web page and extract its main textual content. Caches results in the database. | |
| Args: | |
| url: The URL to fetch. | |
| timeout: HTTP timeout in seconds. | |
| Returns: | |
| A dictionary with keys: url, title, source, excerpt, meta, or None on failure. | |
| """ | |
| # Return cached record if present | |
| cached = get_resource(url) | |
| if cached: | |
| return cached | |
| # Attempt to fetch page | |
| try: | |
| import requests | |
| from bs4 import BeautifulSoup | |
| except ImportError: | |
| raise ImportError("Please install requests and beautifulsoup4") | |
| try: | |
| resp = requests.get(url, timeout=timeout, headers={"User-Agent": "CourseCreatorBot/1.0"}) | |
| resp.raise_for_status() | |
| except Exception: | |
| return None | |
| # If the response is a PDF (by content type or URL), attempt to extract text using PyPDF2 | |
| content_type = resp.headers.get("Content-Type", "").lower() | |
| if (content_type.startswith("application/pdf") or url.lower().endswith(".pdf")) and PdfReader is not None: | |
| try: | |
| # Read PDF content | |
| pdf_stream = io.BytesIO(resp.content) | |
| reader = PdfReader(pdf_stream) | |
| all_text = "" | |
| for page in reader.pages: | |
| try: | |
| text = page.extract_text() or "" | |
| except Exception: | |
| text = "" | |
| all_text += text + "\n" | |
| if not all_text.strip(): | |
| return None | |
| excerpt = all_text[:2000] | |
| # Use the URL as the title for PDFs | |
| title = url | |
| # Determine domain | |
| try: | |
| from urllib.parse import urlparse | |
| domain = urlparse(url).netloc | |
| except Exception: | |
| domain = "" | |
| upsert_resource(url, title, domain, excerpt, meta={"length": len(all_text), "pdf": True}) | |
| return get_resource(url) | |
| except Exception: | |
| # If PDF extraction fails, continue with HTML extraction | |
| pass | |
| # Parse HTML | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| # Title: fall back to URL if missing | |
| title = (soup.title.string.strip() if soup.title and soup.title.string else url)[:200] | |
| # Extract paragraphs | |
| paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)] | |
| content_text = "\n".join(paragraphs) | |
| excerpt = content_text[:2000] | |
| # Domain as source | |
| try: | |
| from urllib.parse import urlparse | |
| domain = urlparse(url).netloc | |
| except Exception: | |
| domain = "" | |
| # Store in DB | |
| upsert_resource(url, title, domain, excerpt, meta={"length": len(content_text)}) | |
| return get_resource(url) | |
| # New function to extract content from a given URL using Tavily Extract API. | |
| def extract_web_content(url): | |
| """Extract the main content of a web page via Tavily Extract. | |
| Args: | |
| url (str): The URL of the page to extract. | |
| Returns: | |
| dict: The Tavily extract response containing page content and metadata. | |
| Raises: | |
| ImportError: If the tavily-python package is missing. | |
| ValueError: If the TAVILY_API_KEY environment variable is not set. | |
| """ | |
| try: | |
| from tavily import TavilyClient | |
| except ImportError: | |
| raise ImportError("Please install tavily-python") | |
| api_key = os.getenv("TAVILY_API_KEY") | |
| if not api_key: | |
| raise ValueError("TAVILY_API_KEY environment variable is required") | |
| client = TavilyClient(api_key=api_key) | |
| # Call the extract endpoint to retrieve structured content from the URL | |
| response = client.extract(url) | |
| return response | |
| # New function to get a YouTube video transcript given its URL | |
| def get_youtube_transcript(video_url): | |
| """Fetch the transcript of a YouTube video using youtube-transcript-api. | |
| Args: | |
| video_url (str): The full URL to a YouTube video. | |
| Returns: | |
| str: The concatenated transcript text, or an empty string if none found. | |
| Raises: | |
| ImportError: If youtube-transcript-api is not installed. | |
| """ | |
| # Parse the video ID from the URL | |
| try: | |
| from urllib.parse import urlparse, parse_qs | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| except ImportError: | |
| raise ImportError("Please install youtube-transcript-api for YouTube transcript extraction") | |
| parsed = urlparse(video_url) | |
| video_id = None | |
| if "youtube.com" in parsed.netloc: | |
| # Extract v parameter | |
| query = parse_qs(parsed.query) | |
| video_id = query.get("v", [None])[0] | |
| elif "youtu.be" in parsed.netloc: | |
| # Shortened link; path contains the ID | |
| video_id = parsed.path.strip("/") | |
| if not video_id: | |
| return "" | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| except Exception: | |
| return "" | |
| # Concatenate all transcript segments into a single string | |
| transcript_text = " ".join(seg.get("text", "") for seg in transcript_list) | |
| return transcript_text |