File size: 8,434 Bytes
d50fc97
 
 
 
813f3c0
 
 
 
 
 
 
 
 
 
 
 
 
 
d50fc97
 
 
 
 
 
 
 
 
 
 
 
 
 
617daa2
813f3c0
 
 
 
 
 
 
06825b1
 
 
 
 
 
 
 
813f3c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
06825b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
813f3c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
617daa2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import os


def run_web_search(query, num_results=5, domain_filter=""):
    """
    Run a web search using Tavily API.

    Args:
        query (str): Search query.
        num_results (int): Number of results to retrieve.
        domain_filter (str): Optional domain filter (comma-separated domains).

    Returns:
        list[dict] | dict: Tavily response. It may return a list directly or a dict with a "results" key.

    Raises:
        ValueError: If the TAVILY_API_KEY env var is not set.
    """
    try:
        from tavily import TavilyClient
    except ImportError:
        raise ImportError("Please install tavily-python")
    api_key = os.getenv("TAVILY_API_KEY")
    if not api_key:
        raise ValueError("TAVILY_API_KEY environment variable is required")
    client = TavilyClient(api_key=api_key)
    params = {"num": num_results}
    if domain_filter:
        # Tavily does not support domain filter directly; pass as search_kwargs if needed
        params["search_kwargs"] = {"site": domain_filter}
    results = client.search(query, **params)
    return results

# ---------------------------------------------------------------------------
# Extended helper functions for credible research and extraction.
# ---------------------------------------------------------------------------

import re
from typing import List, Dict, Optional

# Additional imports for PDF extraction
import io
try:
    from PyPDF2 import PdfReader  # type: ignore
except ImportError:
    # PyPDF2 will be installed via requirements; if missing, pdf extraction will be disabled
    PdfReader = None

# Import DB helpers from sibling module. Note: db.py resides in the same package directory.
from db import get_resource, upsert_resource

def web_search(query: str, max_results: int = 5, allowed_domains: Optional[List[str]] = None) -> List[Dict]:
    """
    Perform a web search and return a list of result dictionaries, filtering by allowed domains.

    Args:
        query: Search string.
        max_results: Maximum number of results to return.
        allowed_domains: Optional list of domains to permit. If provided, only results with URLs
                         containing one of these domains will be included.

    Returns:
        A list of search results (dicts with at least 'url' and 'title' keys).
    """
    raw_results = run_web_search(query, num_results=max_results)
    # Tavily can return either a list or a dict with 'results'
    results_list = raw_results.get("results", []) if isinstance(raw_results, dict) else raw_results or []
    # Filter out results that do not meet allowed domains, if specified
    filtered: List[Dict] = []
    for item in results_list:
        if not isinstance(item, dict):
            continue
        url = item.get("url", "")
        # Basic domain filtering: allow if allowed_domains is None or URL's domain ends with allowed domain
        if allowed_domains:
            try:
                from urllib.parse import urlparse
                domain = urlparse(url).netloc.lower()
                if not any(domain.endswith(ad.lower()) for ad in allowed_domains):
                    continue
            except Exception:
                continue
        filtered.append(item)
        if len(filtered) >= max_results:
            break
    return filtered

def fetch_and_extract(url: str, timeout: int = 15) -> Optional[Dict]:
    """
    Fetch a web page and extract its main textual content. Caches results in the database.

    Args:
        url: The URL to fetch.
        timeout: HTTP timeout in seconds.

    Returns:
        A dictionary with keys: url, title, source, excerpt, meta, or None on failure.
    """
    # Return cached record if present
    cached = get_resource(url)
    if cached:
        return cached
    # Attempt to fetch page
    try:
        import requests
        from bs4 import BeautifulSoup
    except ImportError:
        raise ImportError("Please install requests and beautifulsoup4")
    try:
        resp = requests.get(url, timeout=timeout, headers={"User-Agent": "CourseCreatorBot/1.0"})
        resp.raise_for_status()
    except Exception:
        return None
    # If the response is a PDF (by content type or URL), attempt to extract text using PyPDF2
    content_type = resp.headers.get("Content-Type", "").lower()
    if (content_type.startswith("application/pdf") or url.lower().endswith(".pdf")) and PdfReader is not None:
        try:
            # Read PDF content
            pdf_stream = io.BytesIO(resp.content)
            reader = PdfReader(pdf_stream)
            all_text = ""
            for page in reader.pages:
                try:
                    text = page.extract_text() or ""
                except Exception:
                    text = ""
                all_text += text + "\n"
            if not all_text.strip():
                return None
            excerpt = all_text[:2000]
            # Use the URL as the title for PDFs
            title = url
            # Determine domain
            try:
                from urllib.parse import urlparse
                domain = urlparse(url).netloc
            except Exception:
                domain = ""
            upsert_resource(url, title, domain, excerpt, meta={"length": len(all_text), "pdf": True})
            return get_resource(url)
        except Exception:
            # If PDF extraction fails, continue with HTML extraction
            pass
    # Parse HTML
    soup = BeautifulSoup(resp.text, "html.parser")
    # Title: fall back to URL if missing
    title = (soup.title.string.strip() if soup.title and soup.title.string else url)[:200]
    # Extract paragraphs
    paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)]
    content_text = "\n".join(paragraphs)
    excerpt = content_text[:2000]
    # Domain as source
    try:
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
    except Exception:
        domain = ""
    # Store in DB
    upsert_resource(url, title, domain, excerpt, meta={"length": len(content_text)})
    return get_resource(url)

# New function to extract content from a given URL using Tavily Extract API.
def extract_web_content(url):
    """Extract the main content of a web page via Tavily Extract.

    Args:
        url (str): The URL of the page to extract.

    Returns:
        dict: The Tavily extract response containing page content and metadata.

    Raises:
        ImportError: If the tavily-python package is missing.
        ValueError: If the TAVILY_API_KEY environment variable is not set.
    """
    try:
        from tavily import TavilyClient
    except ImportError:
        raise ImportError("Please install tavily-python")
    api_key = os.getenv("TAVILY_API_KEY")
    if not api_key:
        raise ValueError("TAVILY_API_KEY environment variable is required")
    client = TavilyClient(api_key=api_key)
    # Call the extract endpoint to retrieve structured content from the URL
    response = client.extract(url)
    return response

# New function to get a YouTube video transcript given its URL
def get_youtube_transcript(video_url):
    """Fetch the transcript of a YouTube video using youtube-transcript-api.

    Args:
        video_url (str): The full URL to a YouTube video.

    Returns:
        str: The concatenated transcript text, or an empty string if none found.

    Raises:
        ImportError: If youtube-transcript-api is not installed.
    """
    # Parse the video ID from the URL
    try:
        from urllib.parse import urlparse, parse_qs
        from youtube_transcript_api import YouTubeTranscriptApi
    except ImportError:
        raise ImportError("Please install youtube-transcript-api for YouTube transcript extraction")
    parsed = urlparse(video_url)
    video_id = None
    if "youtube.com" in parsed.netloc:
        # Extract v parameter
        query = parse_qs(parsed.query)
        video_id = query.get("v", [None])[0]
    elif "youtu.be" in parsed.netloc:
        # Shortened link; path contains the ID
        video_id = parsed.path.strip("/")
    if not video_id:
        return ""
    try:
        transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
    except Exception:
        return ""
    # Concatenate all transcript segments into a single string
    transcript_text = " ".join(seg.get("text", "") for seg in transcript_list)
    return transcript_text