mai / scraper.py
Sooteemon's picture
Update scraper.py
9d7be98 verified
import requests
import feedparser
from urllib.parse import quote
from datetime import datetime, timedelta, timezone # <--- เพิ่มใหม่
import calendar # <--- เพิ่มใหม่
# --- ไม่จำเป็นต้องใช้ BeautifulSoup อีกต่อไป ---
# from bs4 import BeautifulSoup
# import time
# from datetime import datetime
class YahooFinanceScraper:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def _parse_feed(self, url, max_articles=10):
"""ฟังก์ชันช่วยในการดึงและแปลง RSS feed (พร้อมกรอง 7 วันล่าสุด)"""
news_list = []
try:
# <--- เพิ่มใหม่: กำหนดเวลาตัดออก (7 วันย้อนหลัง)
# ใช้ .now(timezone.utc) เพื่อให้เป็นการเปรียบเทียบที่ถูกต้อง
cutoff_date_utc = datetime.now(timezone.utc) - timedelta(days=7)
feed = feedparser.parse(url, agent=self.headers['User-Agent'])
# <--- แก้ไข: วนลูป feed.entries ทั้งหมดเพื่อค้นหาข่าวที่ตรงเงื่อนไข
for entry in feed.entries:
# <--- เพิ่มใหม่: ตรวจสอบและแปลงวันที่
published_struct = entry.get('published_parsed')
if not published_struct:
continue # ข้ามข่าวนี้ถ้าไม่มีวันที่ที่ parse ได้
try:
# feedparser ให้ published_parsed เป็น UTC struct_time
# เราต้องใช้ calendar.timegm เพื่อแปลงเป็น timestamp (UTC)
article_timestamp = calendar.timegm(published_struct)
article_date_utc = datetime.fromtimestamp(article_timestamp, tz=timezone.utc)
except Exception:
continue # ข้ามหากแปลงวันที่ไม่ได้
# <--- เพิ่มใหม่: ทำการกรองเวลา
if article_date_utc >= cutoff_date_utc:
# ถ้าข่าวนี้อยู่ในช่วง 7 วัน ให้เพิ่มลง list
news_list.append({
'title': entry.get('title', 'No title'),
'link': entry.get('link', '').split('&url=')[-1],
'summary': entry.get('summary', '')[:300],
# ใช้ .isoformat() เพื่อเก็บวันที่ในรูปแบบมาตรฐาน
'published': article_date_utc.isoformat()
})
# <--- เพิ่มใหม่: ถ้าได้ข่าวครบตามจำนวน max_articles แล้ว ให้หยุด
if len(news_list) >= max_articles:
break
else:
# <--- เพิ่มใหม่: (Optimization)
# ถ้าเจอข่าวที่เก่ากว่า 7 วัน ให้หยุดค้นหาต่อ
# (เพราะ RSS feed มักจะเรียงจากใหม่ไปเก่า)
break
except Exception as e:
print(f"Error parsing feed: {e}")
return self._get_fallback_news(1)
return news_list # คืนค่า list ที่กรองแล้ว
def get_latest_news(self, symbol="", max_articles=10):
"""
ดึงข่าวล่าสุดจาก Yahoo Finance (แก้ไขใหม่ให้ใช้ RSS เสมอ)
(ฟังก์ชันนี้ไม่ต้องแก้ เพราะ _parse_feed ถูกแก้แล้ว)
"""
if symbol:
url = f"https://finance.yahoo.com/rss/quotes/{symbol.upper()}"
else:
url = "https://finance.yahoo.com/news/rssindex"
news_list = self._parse_feed(url, max_articles)
if symbol and not news_list:
return self._get_fallback_news(max_articles)
return news_list
def _get_fallback_news(self, max_articles):
"""
วิธีสำรองในกรณีที่ดึงข่าวไม่ได้ (ใช้ Top Stories feed)
(ฟังก์ชันนี้จะถูกกรอง 7 วันโดยอัตโนมัติ เพราะเรียกใช้ _parse_feed)
"""
try:
url = "https://finance.yahoo.com/rss/topstories"
return self._parse_feed(url, max_articles) # <--- จะถูกกรอง 7 วันอัตโนมัติ
except:
return [{
'title': 'Unable to fetch news',
'link': '',
'summary': 'Please try again later',
'published': 'N/A'
}]
def search_news(self, keyword, max_articles=10):
"""
ค้นหาข่าวด้วย keyword (แก้ไขใหม่ให้ใช้ Google News RSS ซึ่งเสถียรกว่ามาก)
(ฟังก์ชันนี้จะถูกกรอง 7 วันโดยอัตโนมัติ เพราะเรียกใช้ _parse_feed)
"""
if not keyword:
return self.get_latest_news(max_articles=max_articles)
try:
safe_keyword = quote(keyword)
url = f"https://news.google.com/rss/search?q={safe_keyword}+site:finance.yahoo.com&hl=en-US&gl=US&ceid=US:en"
news_list = self._parse_feed(url, max_articles) # <--- จะถูกกรอง 7 วันอัตโนมัติ
return news_list if news_list else self._get_fallback_news(max_articles)
except Exception as e:
print(f"Search error: {e}")
return self._get_fallback_news(max_articles)
# --- ตัวอย่างการใช้งาน (เหมือนเดิม) ---
if __name__ == '__main__':
scraper = YahooFinanceScraper()
print("--- ข่าวล่าสุด (ไม่ระบุสัญลักษณ์) (กรอง 7 วัน) ---")
latest_news = scraper.get_latest_news(max_articles=5)
for news in latest_news:
print(f"[{news['published']}] {news['title']}")
print("\n--- ข่าวหุ้น AAPL (กรอง 7 วัน) ---")
aapl_news = scraper.get_latest_news("AAPL", max_articles=5)
for news in aapl_news:
print(f"[{news['published']}] {news['title']}")
print("\n--- ค้นหาคำว่า 'NVIDIA' (กรอง 7 วัน) ---")
search_results = scraper.search_news("NVIDIA", max_articles=5)
for news in search_results:
print(f"[{news['published']}] {news['title']}")