# ============================================== # NEWS CONTENT EXTRACTOR WITH READABILITY # ============================================== import gradio as gr import requests import json import time import re import html from typing import Dict, Any from fastapi import FastAPI, Request import uvicorn import traceback from bs4 import BeautifulSoup from readability import Document import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================== # NEWS CONTENT EXTRACTOR WITH READABILITY # ============================================== class NewsArticleExtractor: """Extract news articles using readability-lxml""" def __init__(self): self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", ] def extract_article(self, url: str) -> Dict[str, Any]: """Extract article content using multiple methods""" start_time = time.time() logger.info(f"📰 Extracting article from: {url}") # Ensure URL has protocol if not url.startswith(('http://', 'https://')): url = 'https://' + url # Try multiple extraction methods methods = [ self._extract_with_readability, self._extract_with_jina, self._extract_with_selectors, self._extract_fallback, ] best_result = None best_score = 0 for i, method in enumerate(methods): try: logger.info(f" Trying method {i+1}: {method.__name__}") result = method(url) if result.get("success"): # Score the article score = self._score_article(result) result["score"] = score logger.info(f" ✓ Method {i+1} score: {score}") if score > best_score: best_score = score best_result = result # If we have a good score, return early if score > 50: break except Exception as e: logger.error(f" Method {i+1} failed: {e}") time.sleep(1) if best_result and best_score > 20: best_result["execution_time"] = round(time.time() - start_time, 2) best_result["method"] = "article_extraction" return best_result return { "success": False, "url": url, "error": "Could not extract article content", "execution_time": round(time.time() - start_time, 2) } def _extract_with_readability(self, url: str) -> Dict[str, Any]: """Use readability-lxml to extract article content""" try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", "Accept-Encoding": "gzip, deflate, br", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", "Referer": "https://www.google.com/", # Pretend we came from Google } response = requests.get(url, headers=headers, timeout=20, verify=False) if response.status_code == 200: # Parse with readability doc = Document(response.text) # Extract content article_html = doc.summary() title = doc.title() # Convert HTML to clean text soup = BeautifulSoup(article_html, 'html.parser') article_text = soup.get_text(separator='\n', strip=True) # Clean the text cleaned_text = self._clean_article_text(article_text) if len(cleaned_text) > 200: # Extract metadata metadata = self._extract_metadata(response.text) return { "success": True, "url": url, "title": title[:200], "main_content": cleaned_text, "content_length": len(cleaned_text), "content_preview": cleaned_text[:500] + ("..." if len(cleaned_text) > 500 else ""), "source": "readability", "status": response.status_code, "metadata": metadata } return {"success": False, "error": f"Status: {response.status_code}"} except Exception as e: return {"success": False, "error": f"Readability error: {str(e)}"} def _extract_with_jina(self, url: str) -> Dict[str, Any]: """Try Jina Reader with different parameters""" try: jina_url = f"https://r.jina.ai/{url}" # Try with different accept headers accept_headers = [ "text/plain", "application/json", "text/markdown" ] for accept in accept_headers: try: response = requests.get( jina_url, headers={ "Accept": accept, "User-Agent": self.user_agents[0] }, timeout=25 ) if response.status_code == 200: content = response.text # Parse based on content type if accept == "application/json": try: data = json.loads(content) content = data.get("content", content) except: pass # Clean content cleaned = self._clean_article_text(content) # Extract title title = "Jina提取" lines = content.split('\n') for line in lines[:5]: if line.startswith('Title:') or line.startswith('# '): title = line.replace('Title:', '').replace('# ', '').strip() break if len(cleaned) > 200: return { "success": True, "url": url, "title": title[:200], "main_content": cleaned, "content_length": len(cleaned), "source": f"jina_{accept}", "status": response.status_code } except Exception as e: logger.warning(f"Jina attempt with {accept} failed: {e}") continue return {"success": False, "error": "All Jina attempts failed"} except Exception as e: return {"success": False, "error": f"Jina error: {str(e)}"} def _extract_with_selectors(self, url: str) -> Dict[str, Any]: """Extract using specific selectors for sinchew.com.my""" try: headers = { "User-Agent": self.user_agents[1], "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } response = requests.get(url, headers=headers, timeout=15, verify=False) if response.status_code == 200: soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for unwanted in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'iframe', 'button', 'svg']): unwanted.decompose() # Try specific selectors for sinchew.com.my selectors_to_try = [ 'div.entry-content', 'article', 'div.post-content', 'div.content-area', 'div.article-content', 'div.story-content', 'div[itemprop="articleBody"]', 'div.article-body', 'div.main-content', 'div.news-content', ] article_text = "" for selector in selectors_to_try: element = soup.select_one(selector) if element: text = element.get_text(separator='\n', strip=True) if len(text) > len(article_text): article_text = text # If specific selectors didn't work, try finding the main content if len(article_text) < 300: # Look for paragraphs with Chinese text all_p = soup.find_all('p') chinese_paragraphs = [] for p in all_p: text = p.get_text(strip=True) if text and len(text) > 50: # Check if it contains Chinese characters if re.search(r'[\u4e00-\u9fff]', text): chinese_paragraphs.append(text) if chinese_paragraphs: article_text = '\n\n'.join(chinese_paragraphs[:20]) # Limit to 20 paragraphs # Clean the text cleaned_text = self._clean_article_text(article_text) if len(cleaned_text) > 200: # Extract title title = soup.find('title') title_text = title.get_text(strip=True) if title else "新闻标题" # Extract date date = self._extract_date_from_soup(soup) return { "success": True, "url": url, "title": title_text[:200], "date": date, "main_content": cleaned_text, "content_length": len(cleaned_text), "source": "selectors", "status": response.status_code } return {"success": False, "error": f"Status: {response.status_code}"} except Exception as e: return {"success": False, "error": f"Selector error: {str(e)}"} def _extract_fallback(self, url: str) -> Dict[str, Any]: """Fallback extraction method""" try: response = requests.get(url, timeout=10, verify=False) if response.status_code == 200: # Use BeautifulSoup to get clean text soup = BeautifulSoup(response.content, 'html.parser') # Remove all tags except p, div, span for tag in soup.find_all(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'iframe', 'button']): tag.decompose() # Get text and filter all_text = soup.get_text(separator='\n', strip=True) lines = all_text.split('\n') # Filter lines filtered_lines = [] for line in lines: line = line.strip() if (len(line) > 30 and # Minimum length re.search(r'[\u4e00-\u9fff]', line) and # Contains Chinese not re.search(r'cookie|privacy|copyright|advertisement|newsletter|subscribe', line.lower()) and not line.startswith('http')): filtered_lines.append(line) cleaned_text = '\n\n'.join(filtered_lines[:50]) if len(cleaned_text) > 200: title = soup.find('title') title_text = title.get_text(strip=True) if title else "内容提取" return { "success": True, "url": url, "title": title_text[:150], "main_content": cleaned_text, "content_length": len(cleaned_text), "source": "fallback" } return {"success": False, "error": "Fallback extraction failed"} except Exception as e: return {"success": False, "error": str(e)} def _extract_metadata(self, html_content: str) -> Dict[str, str]: """Extract metadata from HTML""" metadata = {} soup = BeautifulSoup(html_content, 'html.parser') # Extract date date = self._extract_date_from_soup(soup) if date: metadata["date"] = date # Extract author author_selectors = [ 'meta[name="author"]', 'meta[property="article:author"]', '.author', '.byline', 'span[itemprop="author"]', ] for selector in author_selectors: element = soup.select_one(selector) if element: if element.name == 'meta': author = element.get('content', '') else: author = element.get_text(strip=True) if author: metadata["author"] = author break return metadata def _extract_date_from_soup(self, soup) -> str: """Extract date from BeautifulSoup object""" date_selectors = [ 'meta[property="article:published_time"]', 'meta[name="pubdate"]', 'meta[name="date"]', 'time', '.date', '.published', '.post-date', '.article-date', ] for selector in date_selectors: element = soup.select_one(selector) if element: if element.name == 'meta': date_str = element.get('content', '') elif element.name == 'time': date_str = element.get('datetime', '') or element.get_text(strip=True) else: date_str = element.get_text(strip=True) if date_str: # Try to parse date date_patterns = [ r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', r'\d{4}/\d{2}/\d{2}', r'\d{4}-\d{2}-\d{2}', r'\d{2}/\d{2}/\d{4}', ] for pattern in date_patterns: match = re.search(pattern, date_str) if match: return match.group() return "" def _clean_article_text(self, text: str) -> str: """Clean article text""" if not text: return "" # Remove image markers and other noise patterns_to_remove = [ r'!\[Image \d+: .*?\]', r'Image \d+:', r'ADVERTISEMENT', r'Sponsored Content', r'点击这里.*', r'更多新闻.*', r'相关新闻.*', r'热门搜索.*', r'大事件.*', r'Copyright.*All rights reserved', r'本网站.*Cookies', r'了解更多.*', r'接受.*', r'简\s*繁', r'登入.*', r'下载APP.*', r'[\*\-\=]{5,}', r'^\s*\d+\s*$', # Line with only numbers ] for pattern in patterns_to_remove: text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.MULTILINE) # Split into lines and clean lines = text.split('\n') cleaned_lines = [] for line in lines: line = line.strip() if (len(line) > 20 and # Minimum length not line.startswith(('http://', 'https://', 'www.')) and not re.search(r'^[\d\s\.\-]+$', line) and # Not just numbers/dashes not re.search(r'cookie|隐私|版权|广告', line.lower())): cleaned_lines.append(line) # Remove duplicate consecutive lines unique_lines = [] for i, line in enumerate(cleaned_lines): if i == 0 or line != cleaned_lines[i-1]: unique_lines.append(line) # Join with paragraph breaks text = '\n\n'.join(unique_lines) # Final cleanup text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'\s+', ' ', text) return text.strip() def _score_article(self, result: Dict[str, Any]) -> int: """Score article quality""" if not result.get("success"): return 0 score = 0 content = result.get("main_content", "") # Length score length = len(content) if length > 800: score += 30 elif length > 500: score += 20 elif length > 300: score += 10 # Paragraph count paragraphs = content.count('\n\n') + 1 if paragraphs > 3: score += 15 elif paragraphs > 1: score += 5 # News keywords in Chinese news_keywords_chinese = ['报道', '新闻', '记者', '警方', '调查', '发生', '表示', '指出', '据知', '据了解', '据悉', '事件', '事故', '案件', '透露', '说明', '强调', '要求', '建议', '认为'] for keyword in news_keywords_chinese: if keyword in content: score += 2 # Check for Chinese text if re.search(r'[\u4e00-\u9fff]', content): score += 20 # Source bonus source = result.get("source", "") if "readability" in source: score += 10 return score # ============================================== # INITIALIZE # ============================================== extractor = NewsArticleExtractor() # ============================================== # FASTAPI APP # ============================================== fastapi_app = FastAPI( title="News Article Extractor", description="Extracts news articles using readability-lxml", version="4.0" ) from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse fastapi_app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @fastapi_app.get("/") async def root(): return { "service": "News Article Extractor", "version": "4.0", "description": "Extracts news articles using multiple methods including readability-lxml", "endpoints": { "GET /": "This info", "GET /health": "Health check", "POST /extract": "Extract article content" } } @fastapi_app.get("/health") async def health(): return { "status": "healthy", "timestamp": time.time(), "service": "article_extractor" } @fastapi_app.post("/extract") async def api_extract(request: Request): """API endpoint for n8n""" try: body = await request.json() url = body.get("url", "").strip() if not url: return JSONResponse( status_code=400, content={"success": False, "error": "URL is required"} ) logger.info(f"📰 API Request: {url}") start_time = time.time() result = extractor.extract_article(url) elapsed = time.time() - start_time logger.info(f" Extraction completed in {elapsed:.2f}s") logger.info(f" Success: {result.get('success')}") logger.info(f" Content length: {result.get('content_length', 0)}") logger.info(f" Method used: {result.get('method', 'unknown')}") return result except json.JSONDecodeError: return JSONResponse( status_code=400, content={"success": False, "error": "Invalid JSON"} ) except Exception as e: logger.error(f"API Error: {traceback.format_exc()}") return JSONResponse( status_code=500, content={ "success": False, "error": str(e) } ) # ============================================== # GRADIO INTERFACE # ============================================== def gradio_extract(url: str): """Gradio interface""" if not url: return "❌ 请输入URL", {} result = extractor.extract_article(url) if result["success"]: content = result["main_content"] title = result.get("title", "无标题") # Format output nicely output = f"""## 📰 {title} **URL:** {result['url']} **提取方法:** {result.get('method', '未知')} **提取时间:** {result['execution_time']}秒 **内容长度:** {result['content_length']}字符 --- {content} --- *提取完成于 {time.strftime('%Y-%m-%d %H:%M:%S')}* """ return output, result else: error = result.get("error", "未知错误") return f"## ❌ 提取失败\n\n**错误:** {error}\n\n**URL:** {result.get('url', '未知')}", result # Create Gradio interface gradio_interface = gr.Interface( fn=gradio_extract, inputs=gr.Textbox( label="新闻文章URL", placeholder="https://example.com/news/article", value="https://northern.sinchew.com.my/?p=7217886" ), outputs=[ gr.Markdown(label="文章内容"), gr.JSON(label="原始数据") ], title="📰 新闻文章提取器 v4.0", description="使用readability-lxml提取新闻文章主要内容", examples=[ ["https://northern.sinchew.com.my/?p=7217886"], ["https://www.sinchew.com.my/?p=7234965"], ["https://www.zaobao.com.sg/realtime/china/story20250127-1525893"] ] ) # ============================================== # MOUNT GRADIO TO FASTAPI # ============================================== app = gr.mount_gradio_app(fastapi_app, gradio_interface, path="/") # ============================================== # LAUNCH THE APP # ============================================== if __name__ == "__main__": print("\n" + "="*60) print("📰 新闻文章提取器 v4.0 启动") print("="*60) print("特性:") print("• 使用readability-lxml进行智能文章提取") print("• 多种提取方法备用") print("• 专门优化中文新闻网站") print("• 自动内容评分系统") print("="*60) print("API端点:") print("• GET /health - 健康检查") print("• POST /extract - 提取文章内容") print("="*60 + "\n") uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" )