import os def run_web_search(query, num_results=5, domain_filter=""): """ Run a web search using Tavily API. Args: query (str): Search query. num_results (int): Number of results to retrieve. domain_filter (str): Optional domain filter (comma-separated domains). Returns: list[dict] | dict: Tavily response. It may return a list directly or a dict with a "results" key. Raises: ValueError: If the TAVILY_API_KEY env var is not set. """ try: from tavily import TavilyClient except ImportError: raise ImportError("Please install tavily-python") api_key = os.getenv("TAVILY_API_KEY") if not api_key: raise ValueError("TAVILY_API_KEY environment variable is required") client = TavilyClient(api_key=api_key) params = {"num": num_results} if domain_filter: # Tavily does not support domain filter directly; pass as search_kwargs if needed params["search_kwargs"] = {"site": domain_filter} results = client.search(query, **params) return results # --------------------------------------------------------------------------- # Extended helper functions for credible research and extraction. # --------------------------------------------------------------------------- import re from typing import List, Dict, Optional # Additional imports for PDF extraction import io try: from PyPDF2 import PdfReader # type: ignore except ImportError: # PyPDF2 will be installed via requirements; if missing, pdf extraction will be disabled PdfReader = None # Import DB helpers from sibling module. Note: db.py resides in the same package directory. from db import get_resource, upsert_resource def web_search(query: str, max_results: int = 5, allowed_domains: Optional[List[str]] = None) -> List[Dict]: """ Perform a web search and return a list of result dictionaries, filtering by allowed domains. Args: query: Search string. max_results: Maximum number of results to return. allowed_domains: Optional list of domains to permit. If provided, only results with URLs containing one of these domains will be included. Returns: A list of search results (dicts with at least 'url' and 'title' keys). """ raw_results = run_web_search(query, num_results=max_results) # Tavily can return either a list or a dict with 'results' results_list = raw_results.get("results", []) if isinstance(raw_results, dict) else raw_results or [] # Filter out results that do not meet allowed domains, if specified filtered: List[Dict] = [] for item in results_list: if not isinstance(item, dict): continue url = item.get("url", "") # Basic domain filtering: allow if allowed_domains is None or URL's domain ends with allowed domain if allowed_domains: try: from urllib.parse import urlparse domain = urlparse(url).netloc.lower() if not any(domain.endswith(ad.lower()) for ad in allowed_domains): continue except Exception: continue filtered.append(item) if len(filtered) >= max_results: break return filtered def fetch_and_extract(url: str, timeout: int = 15) -> Optional[Dict]: """ Fetch a web page and extract its main textual content. Caches results in the database. Args: url: The URL to fetch. timeout: HTTP timeout in seconds. Returns: A dictionary with keys: url, title, source, excerpt, meta, or None on failure. """ # Return cached record if present cached = get_resource(url) if cached: return cached # Attempt to fetch page try: import requests from bs4 import BeautifulSoup except ImportError: raise ImportError("Please install requests and beautifulsoup4") try: resp = requests.get(url, timeout=timeout, headers={"User-Agent": "CourseCreatorBot/1.0"}) resp.raise_for_status() except Exception: return None # If the response is a PDF (by content type or URL), attempt to extract text using PyPDF2 content_type = resp.headers.get("Content-Type", "").lower() if (content_type.startswith("application/pdf") or url.lower().endswith(".pdf")) and PdfReader is not None: try: # Read PDF content pdf_stream = io.BytesIO(resp.content) reader = PdfReader(pdf_stream) all_text = "" for page in reader.pages: try: text = page.extract_text() or "" except Exception: text = "" all_text += text + "\n" if not all_text.strip(): return None excerpt = all_text[:2000] # Use the URL as the title for PDFs title = url # Determine domain try: from urllib.parse import urlparse domain = urlparse(url).netloc except Exception: domain = "" upsert_resource(url, title, domain, excerpt, meta={"length": len(all_text), "pdf": True}) return get_resource(url) except Exception: # If PDF extraction fails, continue with HTML extraction pass # Parse HTML soup = BeautifulSoup(resp.text, "html.parser") # Title: fall back to URL if missing title = (soup.title.string.strip() if soup.title and soup.title.string else url)[:200] # Extract paragraphs paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)] content_text = "\n".join(paragraphs) excerpt = content_text[:2000] # Domain as source try: from urllib.parse import urlparse domain = urlparse(url).netloc except Exception: domain = "" # Store in DB upsert_resource(url, title, domain, excerpt, meta={"length": len(content_text)}) return get_resource(url) # New function to extract content from a given URL using Tavily Extract API. def extract_web_content(url): """Extract the main content of a web page via Tavily Extract. Args: url (str): The URL of the page to extract. Returns: dict: The Tavily extract response containing page content and metadata. Raises: ImportError: If the tavily-python package is missing. ValueError: If the TAVILY_API_KEY environment variable is not set. """ try: from tavily import TavilyClient except ImportError: raise ImportError("Please install tavily-python") api_key = os.getenv("TAVILY_API_KEY") if not api_key: raise ValueError("TAVILY_API_KEY environment variable is required") client = TavilyClient(api_key=api_key) # Call the extract endpoint to retrieve structured content from the URL response = client.extract(url) return response # New function to get a YouTube video transcript given its URL def get_youtube_transcript(video_url): """Fetch the transcript of a YouTube video using youtube-transcript-api. Args: video_url (str): The full URL to a YouTube video. Returns: str: The concatenated transcript text, or an empty string if none found. Raises: ImportError: If youtube-transcript-api is not installed. """ # Parse the video ID from the URL try: from urllib.parse import urlparse, parse_qs from youtube_transcript_api import YouTubeTranscriptApi except ImportError: raise ImportError("Please install youtube-transcript-api for YouTube transcript extraction") parsed = urlparse(video_url) video_id = None if "youtube.com" in parsed.netloc: # Extract v parameter query = parse_qs(parsed.query) video_id = query.get("v", [None])[0] elif "youtu.be" in parsed.netloc: # Shortened link; path contains the ID video_id = parsed.path.strip("/") if not video_id: return "" try: transcript_list = YouTubeTranscriptApi.get_transcript(video_id) except Exception: return "" # Concatenate all transcript segments into a single string transcript_text = " ".join(seg.get("text", "") for seg in transcript_list) return transcript_text