File size: 7,421 Bytes
c8dffe6 9d7be98 bf00b84 9d7be98 bf00b84 c8dffe6 bf00b84 9d7be98 c8dffe6 9d7be98 c8dffe6 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 c8dffe6 bf00b84 c8dffe6 bf00b84 9d7be98 c8dffe6 bf00b84 9d7be98 bf00b84 9d7be98 bf00b84 9d7be98 c8dffe6 9d7be98 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import requests
import feedparser
from urllib.parse import quote
from datetime import datetime, timedelta, timezone # <--- เพิ่มใหม่
import calendar # <--- เพิ่มใหม่
# --- ไม่จำเป็นต้องใช้ BeautifulSoup อีกต่อไป ---
# from bs4 import BeautifulSoup
# import time
# from datetime import datetime
class YahooFinanceScraper:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def _parse_feed(self, url, max_articles=10):
"""ฟังก์ชันช่วยในการดึงและแปลง RSS feed (พร้อมกรอง 7 วันล่าสุด)"""
news_list = []
try:
# <--- เพิ่มใหม่: กำหนดเวลาตัดออก (7 วันย้อนหลัง)
# ใช้ .now(timezone.utc) เพื่อให้เป็นการเปรียบเทียบที่ถูกต้อง
cutoff_date_utc = datetime.now(timezone.utc) - timedelta(days=7)
feed = feedparser.parse(url, agent=self.headers['User-Agent'])
# <--- แก้ไข: วนลูป feed.entries ทั้งหมดเพื่อค้นหาข่าวที่ตรงเงื่อนไข
for entry in feed.entries:
# <--- เพิ่มใหม่: ตรวจสอบและแปลงวันที่
published_struct = entry.get('published_parsed')
if not published_struct:
continue # ข้ามข่าวนี้ถ้าไม่มีวันที่ที่ parse ได้
try:
# feedparser ให้ published_parsed เป็น UTC struct_time
# เราต้องใช้ calendar.timegm เพื่อแปลงเป็น timestamp (UTC)
article_timestamp = calendar.timegm(published_struct)
article_date_utc = datetime.fromtimestamp(article_timestamp, tz=timezone.utc)
except Exception:
continue # ข้ามหากแปลงวันที่ไม่ได้
# <--- เพิ่มใหม่: ทำการกรองเวลา
if article_date_utc >= cutoff_date_utc:
# ถ้าข่าวนี้อยู่ในช่วง 7 วัน ให้เพิ่มลง list
news_list.append({
'title': entry.get('title', 'No title'),
'link': entry.get('link', '').split('&url=')[-1],
'summary': entry.get('summary', '')[:300],
# ใช้ .isoformat() เพื่อเก็บวันที่ในรูปแบบมาตรฐาน
'published': article_date_utc.isoformat()
})
# <--- เพิ่มใหม่: ถ้าได้ข่าวครบตามจำนวน max_articles แล้ว ให้หยุด
if len(news_list) >= max_articles:
break
else:
# <--- เพิ่มใหม่: (Optimization)
# ถ้าเจอข่าวที่เก่ากว่า 7 วัน ให้หยุดค้นหาต่อ
# (เพราะ RSS feed มักจะเรียงจากใหม่ไปเก่า)
break
except Exception as e:
print(f"Error parsing feed: {e}")
return self._get_fallback_news(1)
return news_list # คืนค่า list ที่กรองแล้ว
def get_latest_news(self, symbol="", max_articles=10):
"""
ดึงข่าวล่าสุดจาก Yahoo Finance (แก้ไขใหม่ให้ใช้ RSS เสมอ)
(ฟังก์ชันนี้ไม่ต้องแก้ เพราะ _parse_feed ถูกแก้แล้ว)
"""
if symbol:
url = f"https://finance.yahoo.com/rss/quotes/{symbol.upper()}"
else:
url = "https://finance.yahoo.com/news/rssindex"
news_list = self._parse_feed(url, max_articles)
if symbol and not news_list:
return self._get_fallback_news(max_articles)
return news_list
def _get_fallback_news(self, max_articles):
"""
วิธีสำรองในกรณีที่ดึงข่าวไม่ได้ (ใช้ Top Stories feed)
(ฟังก์ชันนี้จะถูกกรอง 7 วันโดยอัตโนมัติ เพราะเรียกใช้ _parse_feed)
"""
try:
url = "https://finance.yahoo.com/rss/topstories"
return self._parse_feed(url, max_articles) # <--- จะถูกกรอง 7 วันอัตโนมัติ
except:
return [{
'title': 'Unable to fetch news',
'link': '',
'summary': 'Please try again later',
'published': 'N/A'
}]
def search_news(self, keyword, max_articles=10):
"""
ค้นหาข่าวด้วย keyword (แก้ไขใหม่ให้ใช้ Google News RSS ซึ่งเสถียรกว่ามาก)
(ฟังก์ชันนี้จะถูกกรอง 7 วันโดยอัตโนมัติ เพราะเรียกใช้ _parse_feed)
"""
if not keyword:
return self.get_latest_news(max_articles=max_articles)
try:
safe_keyword = quote(keyword)
url = f"https://news.google.com/rss/search?q={safe_keyword}+site:finance.yahoo.com&hl=en-US&gl=US&ceid=US:en"
news_list = self._parse_feed(url, max_articles) # <--- จะถูกกรอง 7 วันอัตโนมัติ
return news_list if news_list else self._get_fallback_news(max_articles)
except Exception as e:
print(f"Search error: {e}")
return self._get_fallback_news(max_articles)
# --- ตัวอย่างการใช้งาน (เหมือนเดิม) ---
if __name__ == '__main__':
scraper = YahooFinanceScraper()
print("--- ข่าวล่าสุด (ไม่ระบุสัญลักษณ์) (กรอง 7 วัน) ---")
latest_news = scraper.get_latest_news(max_articles=5)
for news in latest_news:
print(f"[{news['published']}] {news['title']}")
print("\n--- ข่าวหุ้น AAPL (กรอง 7 วัน) ---")
aapl_news = scraper.get_latest_news("AAPL", max_articles=5)
for news in aapl_news:
print(f"[{news['published']}] {news['title']}")
print("\n--- ค้นหาคำว่า 'NVIDIA' (กรอง 7 วัน) ---")
search_results = scraper.search_news("NVIDIA", max_articles=5)
for news in search_results:
print(f"[{news['published']}] {news['title']}") |