Spaces:
Sleeping
Sleeping
| # ============================================== | |
| # NEWS CONTENT EXTRACTOR WITH READABILITY | |
| # ============================================== | |
| import gradio as gr | |
| import requests | |
| import json | |
| import time | |
| import re | |
| import html | |
| from typing import Dict, Any | |
| from fastapi import FastAPI, Request | |
| import uvicorn | |
| import traceback | |
| from bs4 import BeautifulSoup | |
| from readability import Document | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================== | |
| # NEWS CONTENT EXTRACTOR WITH READABILITY | |
| # ============================================== | |
| class NewsArticleExtractor: | |
| """Extract news articles using readability-lxml""" | |
| def __init__(self): | |
| self.user_agents = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1", | |
| "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Mobile Safari/537.36", | |
| ] | |
| def extract_article(self, url: str) -> Dict[str, Any]: | |
| """Extract article content using multiple methods""" | |
| start_time = time.time() | |
| logger.info(f"📰 Extracting article from: {url}") | |
| # Ensure URL has protocol | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Try multiple extraction methods | |
| methods = [ | |
| self._extract_with_readability, | |
| self._extract_with_jina, | |
| self._extract_with_selectors, | |
| self._extract_fallback, | |
| ] | |
| best_result = None | |
| best_score = 0 | |
| for i, method in enumerate(methods): | |
| try: | |
| logger.info(f" Trying method {i+1}: {method.__name__}") | |
| result = method(url) | |
| if result.get("success"): | |
| # Score the article | |
| score = self._score_article(result) | |
| result["score"] = score | |
| logger.info(f" ✓ Method {i+1} score: {score}") | |
| if score > best_score: | |
| best_score = score | |
| best_result = result | |
| # If we have a good score, return early | |
| if score > 50: | |
| break | |
| except Exception as e: | |
| logger.error(f" Method {i+1} failed: {e}") | |
| time.sleep(1) | |
| if best_result and best_score > 20: | |
| best_result["execution_time"] = round(time.time() - start_time, 2) | |
| best_result["method"] = "article_extraction" | |
| return best_result | |
| return { | |
| "success": False, | |
| "url": url, | |
| "error": "Could not extract article content", | |
| "execution_time": round(time.time() - start_time, 2) | |
| } | |
| def _extract_with_readability(self, url: str) -> Dict[str, Any]: | |
| """Use readability-lxml to extract article content""" | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", | |
| "Accept-Encoding": "gzip, deflate, br", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1", | |
| "Sec-Fetch-Dest": "document", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-User": "?1", | |
| "Cache-Control": "max-age=0", | |
| "Referer": "https://www.google.com/", # Pretend we came from Google | |
| } | |
| response = requests.get(url, headers=headers, timeout=20, verify=False) | |
| if response.status_code == 200: | |
| # Parse with readability | |
| doc = Document(response.text) | |
| # Extract content | |
| article_html = doc.summary() | |
| title = doc.title() | |
| # Convert HTML to clean text | |
| soup = BeautifulSoup(article_html, 'html.parser') | |
| article_text = soup.get_text(separator='\n', strip=True) | |
| # Clean the text | |
| cleaned_text = self._clean_article_text(article_text) | |
| if len(cleaned_text) > 200: | |
| # Extract metadata | |
| metadata = self._extract_metadata(response.text) | |
| return { | |
| "success": True, | |
| "url": url, | |
| "title": title[:200], | |
| "main_content": cleaned_text, | |
| "content_length": len(cleaned_text), | |
| "content_preview": cleaned_text[:500] + ("..." if len(cleaned_text) > 500 else ""), | |
| "source": "readability", | |
| "status": response.status_code, | |
| "metadata": metadata | |
| } | |
| return {"success": False, "error": f"Status: {response.status_code}"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Readability error: {str(e)}"} | |
| def _extract_with_jina(self, url: str) -> Dict[str, Any]: | |
| """Try Jina Reader with different parameters""" | |
| try: | |
| jina_url = f"https://r.jina.ai/{url}" | |
| # Try with different accept headers | |
| accept_headers = [ | |
| "text/plain", | |
| "application/json", | |
| "text/markdown" | |
| ] | |
| for accept in accept_headers: | |
| try: | |
| response = requests.get( | |
| jina_url, | |
| headers={ | |
| "Accept": accept, | |
| "User-Agent": self.user_agents[0] | |
| }, | |
| timeout=25 | |
| ) | |
| if response.status_code == 200: | |
| content = response.text | |
| # Parse based on content type | |
| if accept == "application/json": | |
| try: | |
| data = json.loads(content) | |
| content = data.get("content", content) | |
| except: | |
| pass | |
| # Clean content | |
| cleaned = self._clean_article_text(content) | |
| # Extract title | |
| title = "Jina提取" | |
| lines = content.split('\n') | |
| for line in lines[:5]: | |
| if line.startswith('Title:') or line.startswith('# '): | |
| title = line.replace('Title:', '').replace('# ', '').strip() | |
| break | |
| if len(cleaned) > 200: | |
| return { | |
| "success": True, | |
| "url": url, | |
| "title": title[:200], | |
| "main_content": cleaned, | |
| "content_length": len(cleaned), | |
| "source": f"jina_{accept}", | |
| "status": response.status_code | |
| } | |
| except Exception as e: | |
| logger.warning(f"Jina attempt with {accept} failed: {e}") | |
| continue | |
| return {"success": False, "error": "All Jina attempts failed"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Jina error: {str(e)}"} | |
| def _extract_with_selectors(self, url: str) -> Dict[str, Any]: | |
| """Extract using specific selectors for sinchew.com.my""" | |
| try: | |
| headers = { | |
| "User-Agent": self.user_agents[1], | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", | |
| } | |
| response = requests.get(url, headers=headers, timeout=15, verify=False) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove unwanted elements | |
| for unwanted in soup.find_all(['script', 'style', 'nav', 'header', 'footer', | |
| 'aside', 'form', 'iframe', 'button', 'svg']): | |
| unwanted.decompose() | |
| # Try specific selectors for sinchew.com.my | |
| selectors_to_try = [ | |
| 'div.entry-content', | |
| 'article', | |
| 'div.post-content', | |
| 'div.content-area', | |
| 'div.article-content', | |
| 'div.story-content', | |
| 'div[itemprop="articleBody"]', | |
| 'div.article-body', | |
| 'div.main-content', | |
| 'div.news-content', | |
| ] | |
| article_text = "" | |
| for selector in selectors_to_try: | |
| element = soup.select_one(selector) | |
| if element: | |
| text = element.get_text(separator='\n', strip=True) | |
| if len(text) > len(article_text): | |
| article_text = text | |
| # If specific selectors didn't work, try finding the main content | |
| if len(article_text) < 300: | |
| # Look for paragraphs with Chinese text | |
| all_p = soup.find_all('p') | |
| chinese_paragraphs = [] | |
| for p in all_p: | |
| text = p.get_text(strip=True) | |
| if text and len(text) > 50: | |
| # Check if it contains Chinese characters | |
| if re.search(r'[\u4e00-\u9fff]', text): | |
| chinese_paragraphs.append(text) | |
| if chinese_paragraphs: | |
| article_text = '\n\n'.join(chinese_paragraphs[:20]) # Limit to 20 paragraphs | |
| # Clean the text | |
| cleaned_text = self._clean_article_text(article_text) | |
| if len(cleaned_text) > 200: | |
| # Extract title | |
| title = soup.find('title') | |
| title_text = title.get_text(strip=True) if title else "新闻标题" | |
| # Extract date | |
| date = self._extract_date_from_soup(soup) | |
| return { | |
| "success": True, | |
| "url": url, | |
| "title": title_text[:200], | |
| "date": date, | |
| "main_content": cleaned_text, | |
| "content_length": len(cleaned_text), | |
| "source": "selectors", | |
| "status": response.status_code | |
| } | |
| return {"success": False, "error": f"Status: {response.status_code}"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Selector error: {str(e)}"} | |
| def _extract_fallback(self, url: str) -> Dict[str, Any]: | |
| """Fallback extraction method""" | |
| try: | |
| response = requests.get(url, timeout=10, verify=False) | |
| if response.status_code == 200: | |
| # Use BeautifulSoup to get clean text | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove all tags except p, div, span | |
| for tag in soup.find_all(['script', 'style', 'nav', 'header', 'footer', | |
| 'aside', 'form', 'iframe', 'button']): | |
| tag.decompose() | |
| # Get text and filter | |
| all_text = soup.get_text(separator='\n', strip=True) | |
| lines = all_text.split('\n') | |
| # Filter lines | |
| filtered_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if (len(line) > 30 and # Minimum length | |
| re.search(r'[\u4e00-\u9fff]', line) and # Contains Chinese | |
| not re.search(r'cookie|privacy|copyright|advertisement|newsletter|subscribe', | |
| line.lower()) and | |
| not line.startswith('http')): | |
| filtered_lines.append(line) | |
| cleaned_text = '\n\n'.join(filtered_lines[:50]) | |
| if len(cleaned_text) > 200: | |
| title = soup.find('title') | |
| title_text = title.get_text(strip=True) if title else "内容提取" | |
| return { | |
| "success": True, | |
| "url": url, | |
| "title": title_text[:150], | |
| "main_content": cleaned_text, | |
| "content_length": len(cleaned_text), | |
| "source": "fallback" | |
| } | |
| return {"success": False, "error": "Fallback extraction failed"} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def _extract_metadata(self, html_content: str) -> Dict[str, str]: | |
| """Extract metadata from HTML""" | |
| metadata = {} | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Extract date | |
| date = self._extract_date_from_soup(soup) | |
| if date: | |
| metadata["date"] = date | |
| # Extract author | |
| author_selectors = [ | |
| 'meta[name="author"]', | |
| 'meta[property="article:author"]', | |
| '.author', | |
| '.byline', | |
| 'span[itemprop="author"]', | |
| ] | |
| for selector in author_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| if element.name == 'meta': | |
| author = element.get('content', '') | |
| else: | |
| author = element.get_text(strip=True) | |
| if author: | |
| metadata["author"] = author | |
| break | |
| return metadata | |
| def _extract_date_from_soup(self, soup) -> str: | |
| """Extract date from BeautifulSoup object""" | |
| date_selectors = [ | |
| 'meta[property="article:published_time"]', | |
| 'meta[name="pubdate"]', | |
| 'meta[name="date"]', | |
| 'time', | |
| '.date', | |
| '.published', | |
| '.post-date', | |
| '.article-date', | |
| ] | |
| for selector in date_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| if element.name == 'meta': | |
| date_str = element.get('content', '') | |
| elif element.name == 'time': | |
| date_str = element.get('datetime', '') or element.get_text(strip=True) | |
| else: | |
| date_str = element.get_text(strip=True) | |
| if date_str: | |
| # Try to parse date | |
| date_patterns = [ | |
| r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', | |
| r'\d{4}/\d{2}/\d{2}', | |
| r'\d{4}-\d{2}-\d{2}', | |
| r'\d{2}/\d{2}/\d{4}', | |
| ] | |
| for pattern in date_patterns: | |
| match = re.search(pattern, date_str) | |
| if match: | |
| return match.group() | |
| return "" | |
| def _clean_article_text(self, text: str) -> str: | |
| """Clean article text""" | |
| if not text: | |
| return "" | |
| # Remove image markers and other noise | |
| patterns_to_remove = [ | |
| r'!\[Image \d+: .*?\]', | |
| r'Image \d+:', | |
| r'ADVERTISEMENT', | |
| r'Sponsored Content', | |
| r'点击这里.*', | |
| r'更多新闻.*', | |
| r'相关新闻.*', | |
| r'热门搜索.*', | |
| r'大事件.*', | |
| r'Copyright.*All rights reserved', | |
| r'本网站.*Cookies', | |
| r'了解更多.*', | |
| r'接受.*', | |
| r'简\s*繁', | |
| r'登入.*', | |
| r'下载APP.*', | |
| r'[\*\-\=]{5,}', | |
| r'^\s*\d+\s*$', # Line with only numbers | |
| ] | |
| for pattern in patterns_to_remove: | |
| text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.MULTILINE) | |
| # Split into lines and clean | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if (len(line) > 20 and # Minimum length | |
| not line.startswith(('http://', 'https://', 'www.')) and | |
| not re.search(r'^[\d\s\.\-]+$', line) and # Not just numbers/dashes | |
| not re.search(r'cookie|隐私|版权|广告', line.lower())): | |
| cleaned_lines.append(line) | |
| # Remove duplicate consecutive lines | |
| unique_lines = [] | |
| for i, line in enumerate(cleaned_lines): | |
| if i == 0 or line != cleaned_lines[i-1]: | |
| unique_lines.append(line) | |
| # Join with paragraph breaks | |
| text = '\n\n'.join(unique_lines) | |
| # Final cleanup | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def _score_article(self, result: Dict[str, Any]) -> int: | |
| """Score article quality""" | |
| if not result.get("success"): | |
| return 0 | |
| score = 0 | |
| content = result.get("main_content", "") | |
| # Length score | |
| length = len(content) | |
| if length > 800: | |
| score += 30 | |
| elif length > 500: | |
| score += 20 | |
| elif length > 300: | |
| score += 10 | |
| # Paragraph count | |
| paragraphs = content.count('\n\n') + 1 | |
| if paragraphs > 3: | |
| score += 15 | |
| elif paragraphs > 1: | |
| score += 5 | |
| # News keywords in Chinese | |
| news_keywords_chinese = ['报道', '新闻', '记者', '警方', '调查', '发生', '表示', | |
| '指出', '据知', '据了解', '据悉', '事件', '事故', '案件', | |
| '透露', '说明', '强调', '要求', '建议', '认为'] | |
| for keyword in news_keywords_chinese: | |
| if keyword in content: | |
| score += 2 | |
| # Check for Chinese text | |
| if re.search(r'[\u4e00-\u9fff]', content): | |
| score += 20 | |
| # Source bonus | |
| source = result.get("source", "") | |
| if "readability" in source: | |
| score += 10 | |
| return score | |
| # ============================================== | |
| # INITIALIZE | |
| # ============================================== | |
| extractor = NewsArticleExtractor() | |
| # ============================================== | |
| # FASTAPI APP | |
| # ============================================== | |
| fastapi_app = FastAPI( | |
| title="News Article Extractor", | |
| description="Extracts news articles using readability-lxml", | |
| version="4.0" | |
| ) | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| fastapi_app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def root(): | |
| return { | |
| "service": "News Article Extractor", | |
| "version": "4.0", | |
| "description": "Extracts news articles using multiple methods including readability-lxml", | |
| "endpoints": { | |
| "GET /": "This info", | |
| "GET /health": "Health check", | |
| "POST /extract": "Extract article content" | |
| } | |
| } | |
| async def health(): | |
| return { | |
| "status": "healthy", | |
| "timestamp": time.time(), | |
| "service": "article_extractor" | |
| } | |
| async def api_extract(request: Request): | |
| """API endpoint for n8n""" | |
| try: | |
| body = await request.json() | |
| url = body.get("url", "").strip() | |
| if not url: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"success": False, "error": "URL is required"} | |
| ) | |
| logger.info(f"📰 API Request: {url}") | |
| start_time = time.time() | |
| result = extractor.extract_article(url) | |
| elapsed = time.time() - start_time | |
| logger.info(f" Extraction completed in {elapsed:.2f}s") | |
| logger.info(f" Success: {result.get('success')}") | |
| logger.info(f" Content length: {result.get('content_length', 0)}") | |
| logger.info(f" Method used: {result.get('method', 'unknown')}") | |
| return result | |
| except json.JSONDecodeError: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"success": False, "error": "Invalid JSON"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"API Error: {traceback.format_exc()}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "success": False, | |
| "error": str(e) | |
| } | |
| ) | |
| # ============================================== | |
| # GRADIO INTERFACE | |
| # ============================================== | |
| def gradio_extract(url: str): | |
| """Gradio interface""" | |
| if not url: | |
| return "❌ 请输入URL", {} | |
| result = extractor.extract_article(url) | |
| if result["success"]: | |
| content = result["main_content"] | |
| title = result.get("title", "无标题") | |
| # Format output nicely | |
| output = f"""## 📰 {title} | |
| **URL:** {result['url']} | |
| **提取方法:** {result.get('method', '未知')} | |
| **提取时间:** {result['execution_time']}秒 | |
| **内容长度:** {result['content_length']}字符 | |
| --- | |
| {content} | |
| --- | |
| *提取完成于 {time.strftime('%Y-%m-%d %H:%M:%S')}* | |
| """ | |
| return output, result | |
| else: | |
| error = result.get("error", "未知错误") | |
| return f"## ❌ 提取失败\n\n**错误:** {error}\n\n**URL:** {result.get('url', '未知')}", result | |
| # Create Gradio interface | |
| gradio_interface = gr.Interface( | |
| fn=gradio_extract, | |
| inputs=gr.Textbox( | |
| label="新闻文章URL", | |
| placeholder="https://example.com/news/article", | |
| value="https://northern.sinchew.com.my/?p=7217886" | |
| ), | |
| outputs=[ | |
| gr.Markdown(label="文章内容"), | |
| gr.JSON(label="原始数据") | |
| ], | |
| title="📰 新闻文章提取器 v4.0", | |
| description="使用readability-lxml提取新闻文章主要内容", | |
| examples=[ | |
| ["https://northern.sinchew.com.my/?p=7217886"], | |
| ["https://www.sinchew.com.my/?p=7234965"], | |
| ["https://www.zaobao.com.sg/realtime/china/story20250127-1525893"] | |
| ] | |
| ) | |
| # ============================================== | |
| # MOUNT GRADIO TO FASTAPI | |
| # ============================================== | |
| app = gr.mount_gradio_app(fastapi_app, gradio_interface, path="/") | |
| # ============================================== | |
| # LAUNCH THE APP | |
| # ============================================== | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("📰 新闻文章提取器 v4.0 启动") | |
| print("="*60) | |
| print("特性:") | |
| print("• 使用readability-lxml进行智能文章提取") | |
| print("• 多种提取方法备用") | |
| print("• 专门优化中文新闻网站") | |
| print("• 自动内容评分系统") | |
| print("="*60) | |
| print("API端点:") | |
| print("• GET /health - 健康检查") | |
| print("• POST /extract - 提取文章内容") | |
| print("="*60 + "\n") | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info" | |
| ) |