import asyncio import json import logging from pathlib import Path from typing import Literal, TypedDict from playwright.async_api import Page, async_playwright READABILITY_JS_URL = "https://unpkg.com/@mozilla/readability@0.4.4/Readability.js" logger = logging.getLogger("uvicorn.error") class PageText(TypedDict): url: str text: str WaitUntil = Literal["load", "domcontentloaded", "networkidle", "commit"] async def _inject_readability(page: Page) -> None: is_html = await page.evaluate("() => document.documentElement.nodeName === 'HTML'") if not is_html: return await page.add_script_tag(url=READABILITY_JS_URL) await page.add_script_tag( content="window.__readability__ = new Readability(document.cloneNode(true));" ) async def _fetch_text(page: Page, url: str, wait_until: WaitUntil) -> str: await page.goto(url, wait_until=wait_until) await page.wait_for_timeout(1000) # Attempt Readability.js parsing first try: await _inject_readability(page) readability_text = await page.evaluate( "() => window.__readability__.parse()?.textContent" ) if readability_text: return readability_text.strip() except BaseException as _: pass # Fallback: Twitter specific logic try: tweet_text = await page.locator( "article div[data-testid='tweetText']" ).all_inner_texts() if tweet_text: return "\n".join(tweet_text) except BaseException as _: pass # Final fallback: full body text return await page.evaluate("() => document.body.innerText") async def fetch_text( url: str, headless: bool = False, wait_until: WaitUntil = "load" ) -> PageText: async with async_playwright() as pw: browser = await pw.chromium.launch_persistent_context( user_data_dir="", channel="chrome", headless=headless, no_viewport=True, ) page = await browser.new_page() text = await _fetch_text(page, url, wait_until) await browser.close() return PageText(url=url, text=text) async def fetch_texts( urls: list[str], headless: bool = False, wait_until: WaitUntil = "load" ) -> list[PageText | BaseException]: async with async_playwright() as pw: browser = await pw.chromium.launch_persistent_context( user_data_dir="", channel="chrome", headless=True, no_viewport=True, ) # browser = await pw.chromium.launch_persistent_context( # user_data_dir="/tmp/playwright_profile", # headless=True, # no_viewport=True, # ) pages = [await browser.new_page() for _ in urls] tasks = [_fetch_text(page, url, wait_until) for page, url in zip(pages, urls)] results_raw = await asyncio.gather(*tasks, return_exceptions=True) await browser.close() results: list[PageText | BaseException] = [] for url, result in zip(urls, results_raw): if isinstance(result, BaseException): results.append(result) else: results.append(PageText(url=url, text=result)) return results async def fetch_links_to_json( links: list[str], output_path: str, headless: bool = False, wait_until: WaitUntil = "load", max_content_length: int = 5000, ) -> None: """ Fetch content from a list of links and save to a JSON file. Args: links: List of URLs to fetch content from output_path: Path where the JSON file will be saved headless: Whether to run browser in headless mode wait_until: When to consider page loading complete max_content_length: Maximum number of characters to keep from each page content Returns: None (saves results to JSON file) """ logger.info(f"📥 Fetching content from {len(links)} links...") # Fetch content from all links results = await fetch_texts(links, headless=headless, wait_until=wait_until) # Process results into the desired format json_data = [] for i, (link, result) in enumerate(zip(links, results)): logger.info(f" Processing {i + 1}/{len(links)}: {link}") if isinstance(result, BaseException): # Handle errors gracefully json_data.append({"link": link, "content": "Fail to fetch content..."}) else: # Successfully fetched content - apply length limit content = result["text"] if len(content) > max_content_length: content = ( content[:max_content_length] + "... [content truncated due to length limit]" ) logger.info( f"✂️ Content truncated from {len(result['text'])} to {max_content_length} characters" ) json_data.append({"link": link, "content": content}) # Ensure output directory exists output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) # Save to JSON file with open(output_file, "w", encoding="utf-8") as f: json.dump(json_data, f, ensure_ascii=False, indent=2) logger.info(f"💾 Saved content from {len(links)} links to {output_path}") # Print summary successful = sum( 1 for item in json_data if not item["content"].startswith("Error fetching") ) failed = len(json_data) - successful logger.info(f"📊 Summary: {successful} successful, {failed} failed")