yukee1992's picture
Update app.py
60c2e0a verified
# ==============================================
# NEWS CONTENT EXTRACTOR WITH READABILITY
# ==============================================
import gradio as gr
import requests
import json
import time
import re
import html
from typing import Dict, Any
from fastapi import FastAPI, Request
import uvicorn
import traceback
from bs4 import BeautifulSoup
from readability import Document
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ==============================================
# NEWS CONTENT EXTRACTOR WITH READABILITY
# ==============================================
class NewsArticleExtractor:
"""Extract news articles using readability-lxml"""
def __init__(self):
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36",
]
def extract_article(self, url: str) -> Dict[str, Any]:
"""Extract article content using multiple methods"""
start_time = time.time()
logger.info(f"📰 Extracting article from: {url}")
# Ensure URL has protocol
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
# Try multiple extraction methods
methods = [
self._extract_with_readability,
self._extract_with_jina,
self._extract_with_selectors,
self._extract_fallback,
]
best_result = None
best_score = 0
for i, method in enumerate(methods):
try:
logger.info(f" Trying method {i+1}: {method.__name__}")
result = method(url)
if result.get("success"):
# Score the article
score = self._score_article(result)
result["score"] = score
logger.info(f" ✓ Method {i+1} score: {score}")
if score > best_score:
best_score = score
best_result = result
# If we have a good score, return early
if score > 50:
break
except Exception as e:
logger.error(f" Method {i+1} failed: {e}")
time.sleep(1)
if best_result and best_score > 20:
best_result["execution_time"] = round(time.time() - start_time, 2)
best_result["method"] = "article_extraction"
return best_result
return {
"success": False,
"url": url,
"error": "Could not extract article content",
"execution_time": round(time.time() - start_time, 2)
}
def _extract_with_readability(self, url: str) -> Dict[str, Any]:
"""Use readability-lxml to extract article content"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
"Referer": "https://www.google.com/", # Pretend we came from Google
}
response = requests.get(url, headers=headers, timeout=20, verify=False)
if response.status_code == 200:
# Parse with readability
doc = Document(response.text)
# Extract content
article_html = doc.summary()
title = doc.title()
# Convert HTML to clean text
soup = BeautifulSoup(article_html, 'html.parser')
article_text = soup.get_text(separator='\n', strip=True)
# Clean the text
cleaned_text = self._clean_article_text(article_text)
if len(cleaned_text) > 200:
# Extract metadata
metadata = self._extract_metadata(response.text)
return {
"success": True,
"url": url,
"title": title[:200],
"main_content": cleaned_text,
"content_length": len(cleaned_text),
"content_preview": cleaned_text[:500] + ("..." if len(cleaned_text) > 500 else ""),
"source": "readability",
"status": response.status_code,
"metadata": metadata
}
return {"success": False, "error": f"Status: {response.status_code}"}
except Exception as e:
return {"success": False, "error": f"Readability error: {str(e)}"}
def _extract_with_jina(self, url: str) -> Dict[str, Any]:
"""Try Jina Reader with different parameters"""
try:
jina_url = f"https://r.jina.ai/{url}"
# Try with different accept headers
accept_headers = [
"text/plain",
"application/json",
"text/markdown"
]
for accept in accept_headers:
try:
response = requests.get(
jina_url,
headers={
"Accept": accept,
"User-Agent": self.user_agents[0]
},
timeout=25
)
if response.status_code == 200:
content = response.text
# Parse based on content type
if accept == "application/json":
try:
data = json.loads(content)
content = data.get("content", content)
except:
pass
# Clean content
cleaned = self._clean_article_text(content)
# Extract title
title = "Jina提取"
lines = content.split('\n')
for line in lines[:5]:
if line.startswith('Title:') or line.startswith('# '):
title = line.replace('Title:', '').replace('# ', '').strip()
break
if len(cleaned) > 200:
return {
"success": True,
"url": url,
"title": title[:200],
"main_content": cleaned,
"content_length": len(cleaned),
"source": f"jina_{accept}",
"status": response.status_code
}
except Exception as e:
logger.warning(f"Jina attempt with {accept} failed: {e}")
continue
return {"success": False, "error": "All Jina attempts failed"}
except Exception as e:
return {"success": False, "error": f"Jina error: {str(e)}"}
def _extract_with_selectors(self, url: str) -> Dict[str, Any]:
"""Extract using specific selectors for sinchew.com.my"""
try:
headers = {
"User-Agent": self.user_agents[1],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
response = requests.get(url, headers=headers, timeout=15, verify=False)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
# Remove unwanted elements
for unwanted in soup.find_all(['script', 'style', 'nav', 'header', 'footer',
'aside', 'form', 'iframe', 'button', 'svg']):
unwanted.decompose()
# Try specific selectors for sinchew.com.my
selectors_to_try = [
'div.entry-content',
'article',
'div.post-content',
'div.content-area',
'div.article-content',
'div.story-content',
'div[itemprop="articleBody"]',
'div.article-body',
'div.main-content',
'div.news-content',
]
article_text = ""
for selector in selectors_to_try:
element = soup.select_one(selector)
if element:
text = element.get_text(separator='\n', strip=True)
if len(text) > len(article_text):
article_text = text
# If specific selectors didn't work, try finding the main content
if len(article_text) < 300:
# Look for paragraphs with Chinese text
all_p = soup.find_all('p')
chinese_paragraphs = []
for p in all_p:
text = p.get_text(strip=True)
if text and len(text) > 50:
# Check if it contains Chinese characters
if re.search(r'[\u4e00-\u9fff]', text):
chinese_paragraphs.append(text)
if chinese_paragraphs:
article_text = '\n\n'.join(chinese_paragraphs[:20]) # Limit to 20 paragraphs
# Clean the text
cleaned_text = self._clean_article_text(article_text)
if len(cleaned_text) > 200:
# Extract title
title = soup.find('title')
title_text = title.get_text(strip=True) if title else "新闻标题"
# Extract date
date = self._extract_date_from_soup(soup)
return {
"success": True,
"url": url,
"title": title_text[:200],
"date": date,
"main_content": cleaned_text,
"content_length": len(cleaned_text),
"source": "selectors",
"status": response.status_code
}
return {"success": False, "error": f"Status: {response.status_code}"}
except Exception as e:
return {"success": False, "error": f"Selector error: {str(e)}"}
def _extract_fallback(self, url: str) -> Dict[str, Any]:
"""Fallback extraction method"""
try:
response = requests.get(url, timeout=10, verify=False)
if response.status_code == 200:
# Use BeautifulSoup to get clean text
soup = BeautifulSoup(response.content, 'html.parser')
# Remove all tags except p, div, span
for tag in soup.find_all(['script', 'style', 'nav', 'header', 'footer',
'aside', 'form', 'iframe', 'button']):
tag.decompose()
# Get text and filter
all_text = soup.get_text(separator='\n', strip=True)
lines = all_text.split('\n')
# Filter lines
filtered_lines = []
for line in lines:
line = line.strip()
if (len(line) > 30 and # Minimum length
re.search(r'[\u4e00-\u9fff]', line) and # Contains Chinese
not re.search(r'cookie|privacy|copyright|advertisement|newsletter|subscribe',
line.lower()) and
not line.startswith('http')):
filtered_lines.append(line)
cleaned_text = '\n\n'.join(filtered_lines[:50])
if len(cleaned_text) > 200:
title = soup.find('title')
title_text = title.get_text(strip=True) if title else "内容提取"
return {
"success": True,
"url": url,
"title": title_text[:150],
"main_content": cleaned_text,
"content_length": len(cleaned_text),
"source": "fallback"
}
return {"success": False, "error": "Fallback extraction failed"}
except Exception as e:
return {"success": False, "error": str(e)}
def _extract_metadata(self, html_content: str) -> Dict[str, str]:
"""Extract metadata from HTML"""
metadata = {}
soup = BeautifulSoup(html_content, 'html.parser')
# Extract date
date = self._extract_date_from_soup(soup)
if date:
metadata["date"] = date
# Extract author
author_selectors = [
'meta[name="author"]',
'meta[property="article:author"]',
'.author',
'.byline',
'span[itemprop="author"]',
]
for selector in author_selectors:
element = soup.select_one(selector)
if element:
if element.name == 'meta':
author = element.get('content', '')
else:
author = element.get_text(strip=True)
if author:
metadata["author"] = author
break
return metadata
def _extract_date_from_soup(self, soup) -> str:
"""Extract date from BeautifulSoup object"""
date_selectors = [
'meta[property="article:published_time"]',
'meta[name="pubdate"]',
'meta[name="date"]',
'time',
'.date',
'.published',
'.post-date',
'.article-date',
]
for selector in date_selectors:
element = soup.select_one(selector)
if element:
if element.name == 'meta':
date_str = element.get('content', '')
elif element.name == 'time':
date_str = element.get('datetime', '') or element.get_text(strip=True)
else:
date_str = element.get_text(strip=True)
if date_str:
# Try to parse date
date_patterns = [
r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}',
r'\d{4}/\d{2}/\d{2}',
r'\d{4}-\d{2}-\d{2}',
r'\d{2}/\d{2}/\d{4}',
]
for pattern in date_patterns:
match = re.search(pattern, date_str)
if match:
return match.group()
return ""
def _clean_article_text(self, text: str) -> str:
"""Clean article text"""
if not text:
return ""
# Remove image markers and other noise
patterns_to_remove = [
r'!\[Image \d+: .*?\]',
r'Image \d+:',
r'ADVERTISEMENT',
r'Sponsored Content',
r'点击这里.*',
r'更多新闻.*',
r'相关新闻.*',
r'热门搜索.*',
r'大事件.*',
r'Copyright.*All rights reserved',
r'本网站.*Cookies',
r'了解更多.*',
r'接受.*',
r'简\s*繁',
r'登入.*',
r'下载APP.*',
r'[\*\-\=]{5,}',
r'^\s*\d+\s*$', # Line with only numbers
]
for pattern in patterns_to_remove:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.MULTILINE)
# Split into lines and clean
lines = text.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if (len(line) > 20 and # Minimum length
not line.startswith(('http://', 'https://', 'www.')) and
not re.search(r'^[\d\s\.\-]+$', line) and # Not just numbers/dashes
not re.search(r'cookie|隐私|版权|广告', line.lower())):
cleaned_lines.append(line)
# Remove duplicate consecutive lines
unique_lines = []
for i, line in enumerate(cleaned_lines):
if i == 0 or line != cleaned_lines[i-1]:
unique_lines.append(line)
# Join with paragraph breaks
text = '\n\n'.join(unique_lines)
# Final cleanup
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _score_article(self, result: Dict[str, Any]) -> int:
"""Score article quality"""
if not result.get("success"):
return 0
score = 0
content = result.get("main_content", "")
# Length score
length = len(content)
if length > 800:
score += 30
elif length > 500:
score += 20
elif length > 300:
score += 10
# Paragraph count
paragraphs = content.count('\n\n') + 1
if paragraphs > 3:
score += 15
elif paragraphs > 1:
score += 5
# News keywords in Chinese
news_keywords_chinese = ['报道', '新闻', '记者', '警方', '调查', '发生', '表示',
'指出', '据知', '据了解', '据悉', '事件', '事故', '案件',
'透露', '说明', '强调', '要求', '建议', '认为']
for keyword in news_keywords_chinese:
if keyword in content:
score += 2
# Check for Chinese text
if re.search(r'[\u4e00-\u9fff]', content):
score += 20
# Source bonus
source = result.get("source", "")
if "readability" in source:
score += 10
return score
# ==============================================
# INITIALIZE
# ==============================================
extractor = NewsArticleExtractor()
# ==============================================
# FASTAPI APP
# ==============================================
fastapi_app = FastAPI(
title="News Article Extractor",
description="Extracts news articles using readability-lxml",
version="4.0"
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@fastapi_app.get("/")
async def root():
return {
"service": "News Article Extractor",
"version": "4.0",
"description": "Extracts news articles using multiple methods including readability-lxml",
"endpoints": {
"GET /": "This info",
"GET /health": "Health check",
"POST /extract": "Extract article content"
}
}
@fastapi_app.get("/health")
async def health():
return {
"status": "healthy",
"timestamp": time.time(),
"service": "article_extractor"
}
@fastapi_app.post("/extract")
async def api_extract(request: Request):
"""API endpoint for n8n"""
try:
body = await request.json()
url = body.get("url", "").strip()
if not url:
return JSONResponse(
status_code=400,
content={"success": False, "error": "URL is required"}
)
logger.info(f"📰 API Request: {url}")
start_time = time.time()
result = extractor.extract_article(url)
elapsed = time.time() - start_time
logger.info(f" Extraction completed in {elapsed:.2f}s")
logger.info(f" Success: {result.get('success')}")
logger.info(f" Content length: {result.get('content_length', 0)}")
logger.info(f" Method used: {result.get('method', 'unknown')}")
return result
except json.JSONDecodeError:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Invalid JSON"}
)
except Exception as e:
logger.error(f"API Error: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={
"success": False,
"error": str(e)
}
)
# ==============================================
# GRADIO INTERFACE
# ==============================================
def gradio_extract(url: str):
"""Gradio interface"""
if not url:
return "❌ 请输入URL", {}
result = extractor.extract_article(url)
if result["success"]:
content = result["main_content"]
title = result.get("title", "无标题")
# Format output nicely
output = f"""## 📰 {title}
**URL:** {result['url']}
**提取方法:** {result.get('method', '未知')}
**提取时间:** {result['execution_time']}
**内容长度:** {result['content_length']}字符
---
{content}
---
*提取完成于 {time.strftime('%Y-%m-%d %H:%M:%S')}*
"""
return output, result
else:
error = result.get("error", "未知错误")
return f"## ❌ 提取失败\n\n**错误:** {error}\n\n**URL:** {result.get('url', '未知')}", result
# Create Gradio interface
gradio_interface = gr.Interface(
fn=gradio_extract,
inputs=gr.Textbox(
label="新闻文章URL",
placeholder="https://example.com/news/article",
value="https://northern.sinchew.com.my/?p=7217886"
),
outputs=[
gr.Markdown(label="文章内容"),
gr.JSON(label="原始数据")
],
title="📰 新闻文章提取器 v4.0",
description="使用readability-lxml提取新闻文章主要内容",
examples=[
["https://northern.sinchew.com.my/?p=7217886"],
["https://www.sinchew.com.my/?p=7234965"],
["https://www.zaobao.com.sg/realtime/china/story20250127-1525893"]
]
)
# ==============================================
# MOUNT GRADIO TO FASTAPI
# ==============================================
app = gr.mount_gradio_app(fastapi_app, gradio_interface, path="/")
# ==============================================
# LAUNCH THE APP
# ==============================================
if __name__ == "__main__":
print("\n" + "="*60)
print("📰 新闻文章提取器 v4.0 启动")
print("="*60)
print("特性:")
print("• 使用readability-lxml进行智能文章提取")
print("• 多种提取方法备用")
print("• 专门优化中文新闻网站")
print("• 自动内容评分系统")
print("="*60)
print("API端点:")
print("• GET /health - 健康检查")
print("• POST /extract - 提取文章内容")
print("="*60 + "\n")
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info"
)