File size: 5,601 Bytes
eff2be4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import asyncio
import json
import logging
from pathlib import Path
from typing import Literal, TypedDict

from playwright.async_api import Page, async_playwright

READABILITY_JS_URL = "https://unpkg.com/@mozilla/readability@0.4.4/Readability.js"
logger = logging.getLogger("uvicorn.error")


class PageText(TypedDict):
    url: str
    text: str


WaitUntil = Literal["load", "domcontentloaded", "networkidle", "commit"]


async def _inject_readability(page: Page) -> None:
    is_html = await page.evaluate("() => document.documentElement.nodeName === 'HTML'")
    if not is_html:
        return
    
    await page.add_script_tag(url=READABILITY_JS_URL)
    await page.add_script_tag(
        content="window.__readability__ = new Readability(document.cloneNode(true));"
    )


async def _fetch_text(page: Page, url: str, wait_until: WaitUntil) -> str:
    await page.goto(url, wait_until=wait_until)
    await page.wait_for_timeout(1000)

    # Attempt Readability.js parsing first
    try:
        await _inject_readability(page)
        readability_text = await page.evaluate(
            "() => window.__readability__.parse()?.textContent"
        )
        if readability_text:
            return readability_text.strip()
    except BaseException as _:
        pass

    # Fallback: Twitter specific logic
    try:
        tweet_text = await page.locator(
            "article div[data-testid='tweetText']"
        ).all_inner_texts()
        if tweet_text:
            return "\n".join(tweet_text)
    except BaseException as _:
        pass

    # Final fallback: full body text
    return await page.evaluate("() => document.body.innerText")


async def fetch_text(
    url: str, headless: bool = False, wait_until: WaitUntil = "load"
) -> PageText:
    async with async_playwright() as pw:
        browser = await pw.chromium.launch_persistent_context(
            user_data_dir="",
            channel="chrome",
            headless=headless,
            no_viewport=True,
        )
        page = await browser.new_page()
        text = await _fetch_text(page, url, wait_until)
        await browser.close()

    return PageText(url=url, text=text)


async def fetch_texts(
    urls: list[str], headless: bool = False, wait_until: WaitUntil = "load"
) -> list[PageText | BaseException]:
    async with async_playwright() as pw:
        browser = await pw.chromium.launch_persistent_context(
            user_data_dir="",
            channel="chrome",
            headless=True,
            no_viewport=True,
        )
        # browser = await pw.chromium.launch_persistent_context(
        #     user_data_dir="/tmp/playwright_profile",
        #     headless=True,
        #     no_viewport=True,
        # )
        pages = [await browser.new_page() for _ in urls]

        tasks = [_fetch_text(page, url, wait_until) for page, url in zip(pages, urls)]
        results_raw = await asyncio.gather(*tasks, return_exceptions=True)
        await browser.close()

    results: list[PageText | BaseException] = []
    for url, result in zip(urls, results_raw):
        if isinstance(result, BaseException):
            results.append(result)
        else:
            results.append(PageText(url=url, text=result))

    return results


async def fetch_links_to_json(
    links: list[str],
    output_path: str,
    headless: bool = False,
    wait_until: WaitUntil = "load",
    max_content_length: int = 5000,
) -> None:
    """
    Fetch content from a list of links and save to a JSON file.

    Args:
        links: List of URLs to fetch content from
        output_path: Path where the JSON file will be saved
        headless: Whether to run browser in headless mode
        wait_until: When to consider page loading complete
        max_content_length: Maximum number of characters to keep from each page content

    Returns:
        None (saves results to JSON file)
    """
    logger.info(f"📥 Fetching content from {len(links)} links...")

    # Fetch content from all links
    results = await fetch_texts(links, headless=headless, wait_until=wait_until)

    # Process results into the desired format
    json_data = []
    for i, (link, result) in enumerate(zip(links, results)):
        logger.info(f"  Processing {i + 1}/{len(links)}: {link}")

        if isinstance(result, BaseException):
            # Handle errors gracefully
            json_data.append({"link": link, "content": "Fail to fetch content..."})
        else:
            # Successfully fetched content - apply length limit
            content = result["text"]
            if len(content) > max_content_length:
                content = (
                    content[:max_content_length]
                    + "... [content truncated due to length limit]"
                )
                logger.info(
                    f"✂️ Content truncated from {len(result['text'])} to {max_content_length} characters"
                )

            json_data.append({"link": link, "content": content})

    # Ensure output directory exists
    output_file = Path(output_path)
    output_file.parent.mkdir(parents=True, exist_ok=True)

    # Save to JSON file
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(json_data, f, ensure_ascii=False, indent=2)

    logger.info(f"💾 Saved content from {len(links)} links to {output_path}")

    # Print summary
    successful = sum(
        1 for item in json_data if not item["content"].startswith("Error fetching")
    )
    failed = len(json_data) - successful
    logger.info(f"📊 Summary: {successful} successful, {failed} failed")