geolocation / src /prompt /fetch /content_fetch.py
3v324v23's picture
init prj
eff2be4
import asyncio
import json
import logging
from pathlib import Path
from typing import Literal, TypedDict
from playwright.async_api import Page, async_playwright
READABILITY_JS_URL = "https://unpkg.com/@mozilla/readability@0.4.4/Readability.js"
logger = logging.getLogger("uvicorn.error")
class PageText(TypedDict):
url: str
text: str
WaitUntil = Literal["load", "domcontentloaded", "networkidle", "commit"]
async def _inject_readability(page: Page) -> None:
is_html = await page.evaluate("() => document.documentElement.nodeName === 'HTML'")
if not is_html:
return
await page.add_script_tag(url=READABILITY_JS_URL)
await page.add_script_tag(
content="window.__readability__ = new Readability(document.cloneNode(true));"
)
async def _fetch_text(page: Page, url: str, wait_until: WaitUntil) -> str:
await page.goto(url, wait_until=wait_until)
await page.wait_for_timeout(1000)
# Attempt Readability.js parsing first
try:
await _inject_readability(page)
readability_text = await page.evaluate(
"() => window.__readability__.parse()?.textContent"
)
if readability_text:
return readability_text.strip()
except BaseException as _:
pass
# Fallback: Twitter specific logic
try:
tweet_text = await page.locator(
"article div[data-testid='tweetText']"
).all_inner_texts()
if tweet_text:
return "\n".join(tweet_text)
except BaseException as _:
pass
# Final fallback: full body text
return await page.evaluate("() => document.body.innerText")
async def fetch_text(
url: str, headless: bool = False, wait_until: WaitUntil = "load"
) -> PageText:
async with async_playwright() as pw:
browser = await pw.chromium.launch_persistent_context(
user_data_dir="",
channel="chrome",
headless=headless,
no_viewport=True,
)
page = await browser.new_page()
text = await _fetch_text(page, url, wait_until)
await browser.close()
return PageText(url=url, text=text)
async def fetch_texts(
urls: list[str], headless: bool = False, wait_until: WaitUntil = "load"
) -> list[PageText | BaseException]:
async with async_playwright() as pw:
browser = await pw.chromium.launch_persistent_context(
user_data_dir="",
channel="chrome",
headless=True,
no_viewport=True,
)
# browser = await pw.chromium.launch_persistent_context(
# user_data_dir="/tmp/playwright_profile",
# headless=True,
# no_viewport=True,
# )
pages = [await browser.new_page() for _ in urls]
tasks = [_fetch_text(page, url, wait_until) for page, url in zip(pages, urls)]
results_raw = await asyncio.gather(*tasks, return_exceptions=True)
await browser.close()
results: list[PageText | BaseException] = []
for url, result in zip(urls, results_raw):
if isinstance(result, BaseException):
results.append(result)
else:
results.append(PageText(url=url, text=result))
return results
async def fetch_links_to_json(
links: list[str],
output_path: str,
headless: bool = False,
wait_until: WaitUntil = "load",
max_content_length: int = 5000,
) -> None:
"""
Fetch content from a list of links and save to a JSON file.
Args:
links: List of URLs to fetch content from
output_path: Path where the JSON file will be saved
headless: Whether to run browser in headless mode
wait_until: When to consider page loading complete
max_content_length: Maximum number of characters to keep from each page content
Returns:
None (saves results to JSON file)
"""
logger.info(f"📥 Fetching content from {len(links)} links...")
# Fetch content from all links
results = await fetch_texts(links, headless=headless, wait_until=wait_until)
# Process results into the desired format
json_data = []
for i, (link, result) in enumerate(zip(links, results)):
logger.info(f" Processing {i + 1}/{len(links)}: {link}")
if isinstance(result, BaseException):
# Handle errors gracefully
json_data.append({"link": link, "content": "Fail to fetch content..."})
else:
# Successfully fetched content - apply length limit
content = result["text"]
if len(content) > max_content_length:
content = (
content[:max_content_length]
+ "... [content truncated due to length limit]"
)
logger.info(
f"✂️ Content truncated from {len(result['text'])} to {max_content_length} characters"
)
json_data.append({"link": link, "content": content})
# Ensure output directory exists
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
# Save to JSON file
with open(output_file, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
logger.info(f"💾 Saved content from {len(links)} links to {output_path}")
# Print summary
successful = sum(
1 for item in json_data if not item["content"].startswith("Error fetching")
)
failed = len(json_data) - successful
logger.info(f"📊 Summary: {successful} successful, {failed} failed")